You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/11 14:18:09 UTC

[camel] 01/03: CAMEL-14625: Support asynchronous message processing

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 968b29e5ac4816ae90574d0883b7508e96f2727c
Author: Masa Horiyama <ma...@toasttab.com>
AuthorDate: Tue Mar 10 03:01:51 2020 -0400

    CAMEL-14625: Support asynchronous message processing
---
 .../org/apache/camel/component/pulsar/pulsar.json  |   6 +-
 .../src/main/docs/pulsar-component.adoc            |   6 +-
 .../component/pulsar/PulsarConfiguration.java      |   4 +-
 .../camel/component/pulsar/PulsarEndpoint.java     |  23 +++
 .../component/pulsar/PulsarMessageListener.java    |  59 ++++--
 .../consumers/CommonCreationStrategyImpl.java      |   2 +-
 .../pulsar/PulsarConsumerInAsynchronousTest.java   | 193 ++++++++++++++++++++
 .../pulsar/PulsarConsumerInSynchronousTest.java    | 199 +++++++++++++++++++++
 .../dsl/PulsarComponentBuilderFactory.java         |  10 +-
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |  32 ++--
 .../modules/ROOT/pages/pulsar-component.adoc       |   6 +-
 11 files changed, 494 insertions(+), 46 deletions(-)

diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 36bd9a2..c202ca0 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -22,7 +22,7 @@
     "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.PulsarConfiguration", "deprecated": false, "secret": false, "description": "Allows to pre-configure the Pulsar component with common options that the endpoints will reuse." },
     "ackGroupTimeMillis": { "kind": "property", "displayName": "Ack Group Time Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100" },
     "ackTimeoutMillis": { "kind": "property", "displayName": "Ack Timeout Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Timeout for unacknowledged messages in milliseconds - defaults to 10000" },
-    "allowManualAcknowledgement": { "kind": "property", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Whether to allow manual message acknowledgements. If this option is enabled, then message [...]
+    "allowManualAcknowledgement": { "kind": "property", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Whether to allow manual message acknowledgements. If this option is enabled, then message [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by [...]
     "consumerName": { "kind": "property", "displayName": "Consumer Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "sole-consumer", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the consumer when subscription is EXCLUSIVE" },
     "consumerNamePrefix": { "kind": "property", "displayName": "Consumer Name Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "cons", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Prefix to add to consumer names when a SHARED or FAILOVER subscription is used" },
@@ -59,7 +59,7 @@
     "topic": { "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "description": "The topic" },
     "ackGroupTimeMillis": { "kind": "parameter", "displayName": "Ack Group Time Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100" },
     "ackTimeoutMillis": { "kind": "parameter", "displayName": "Ack Timeout Millis", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Timeout for unacknowledged messages in milliseconds - defaults to 10000" },
-    "allowManualAcknowledgement": { "kind": "parameter", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Whether to allow manual message acknowledgements. If this option is enabled, then  [...]
+    "allowManualAcknowledgement": { "kind": "parameter", "displayName": "Allow Manual Acknowledgement", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Whether to allow manual message acknowledgements. If this option is enabled, then  [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...]
     "consumerName": { "kind": "parameter", "displayName": "Consumer Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "sole-consumer", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the consumer when subscription is EXCLUSIVE" },
     "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "cons", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Prefix to add to consumer names when a SHARED or FAILOVER subscription is used" },
@@ -87,6 +87,6 @@
     "producerName": { "kind": "parameter", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." },
     "sendTimeoutMs": { "kind": "parameter", "displayName": "Send Timeout Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "30000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Send timeout in milliseconds" },
     "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
-    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }
+    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }
   }
 }
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 25271f7..e9de8d2 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -41,7 +41,7 @@ The Pulsar component supports 32 options, which are listed below.
 | *configuration* (common) | Allows to pre-configure the Pulsar component with common options that the endpoints will reuse. |  | PulsarConfiguration
 | *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long
 | *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long
-| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
+| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not acknowledged automatically after successful route completion. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
@@ -107,7 +107,7 @@ with the following path and query parameters:
 | Name | Description | Default | Type
 | *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long
 | *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long
-| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
+| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not acknowledged automatically after successful route completion. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
@@ -135,7 +135,7 @@ with the following path and query parameters:
 | *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | true | boolean
 |===
 // endpoint options: END
 
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 2d48940..ad7d011 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -183,8 +183,8 @@ public class PulsarConfiguration implements Cloneable {
     /**
      * Whether to allow manual message acknowledgements.
      * <p/>
-     * If this option is enabled, then messages are not immediately acknowledged
-     * after being consumed. Instead, an instance of
+     * If this option is enabled, then messages are not acknowledged automatically
+     * after successful route completion. Instead, an instance of
      * {@link PulsarMessageReceipt} is stored as a header on the
      * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
      * using {@link PulsarMessageReceipt} at any time before the ackTimeout
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index afcdf4f..ffa941f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -45,11 +45,17 @@ public class PulsarEndpoint extends DefaultEndpoint {
     @UriPath
     @Metadata(required = true)
     private String topic;
+
+    @UriParam(defaultValue = "true", label = "advanced",
+            description = "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).")
+    private boolean synchronous;
+
     @UriParam
     private PulsarConfiguration pulsarConfiguration;
 
     public PulsarEndpoint(String uri, PulsarComponent component) {
         super(uri, component);
+        this.synchronous = true;
     }
 
     @Override
@@ -119,6 +125,23 @@ public class PulsarEndpoint extends DefaultEndpoint {
         this.topic = topic;
     }
 
+    /**
+     * Returns whether synchronous processing should be strictly used.
+     */
+    @Override
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used, or Camel is
+     * allowed to use asynchronous processing (if supported).
+     */
+    @Override
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public PulsarConfiguration getPulsarConfiguration() {
         return pulsarConfiguration;
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 11e87fb..34372b9 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.pulsar;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,13 +32,11 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
     private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageListener.class);
 
     private final PulsarEndpoint endpoint;
-    private final ExceptionHandler exceptionHandler;
-    private final Processor processor;
+    private final PulsarConsumer pulsarConsumer;
 
-    public PulsarMessageListener(PulsarEndpoint endpoint, ExceptionHandler exceptionHandler, Processor processor) {
+    public PulsarMessageListener(PulsarEndpoint endpoint, PulsarConsumer pulsarConsumer) {
         this.endpoint = endpoint;
-        this.exceptionHandler = exceptionHandler;
-        this.processor = processor;
+        this.pulsarConsumer = pulsarConsumer;
     }
 
     @Override
@@ -47,20 +45,55 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
 
         try {
             if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
-                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
-                processor.process(exchange);
+                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
+                        endpoint.getComponent().getPulsarMessageReceiptFactory()
+                                .newInstance(exchange, message, consumer));
+            }
+            if (endpoint.isSynchronous()) {
+                process(exchange, consumer, message);
             } else {
-                processor.process(exchange);
-                consumer.acknowledge(message.getMessageId());
+                processAsync(exchange, consumer, message);
             }
         } catch (Exception exception) {
             handleProcessorException(exchange, exception);
         }
     }
 
+    private void process(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message) throws Exception {
+        pulsarConsumer.getProcessor().process(exchange);
+        acknowledge(consumer, message);
+    }
+
+    private void processAsync(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message) {
+        pulsarConsumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+            @Override
+            public void done(boolean doneSync) {
+                if (exchange.getException() != null) {
+                    handleProcessorException(exchange, exchange.getException());
+                } else {
+                    try {
+                        acknowledge(consumer, message);
+                    } catch (Exception e) {
+                        handleProcessorException(exchange, e);
+                    }
+                }
+            }
+        });
+    }
+
+    private void acknowledge(final Consumer<byte[]> consumer, final Message<byte[]> message)
+            throws PulsarClientException {
+        if (!endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
+            consumer.acknowledge(message.getMessageId());
+        }
+    }
+
     private void handleProcessorException(final Exchange exchange, final Exception exception) {
-        final Exchange exchangeWithException = PulsarMessageUtils.updateExchangeWithException(exception, exchange);
+        final Exchange exchangeWithException = PulsarMessageUtils
+                .updateExchangeWithException(exception, exchange);
 
-        exceptionHandler.handleException("An error occurred", exchangeWithException, exception);
+        pulsarConsumer.getExceptionHandler()
+                .handleException("An error occurred", exchangeWithException, exception);
     }
+
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index a411347..0f19172 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -39,7 +39,7 @@ public final class CommonCreationStrategyImpl {
             .subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
             .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
             .negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS)
-            .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
+            .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer));
 
         if (endpointConfiguration.getMaxRedeliverCount() != null) {
             DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder()
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java
new file mode 100644
index 0000000..3b62564
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInAsynchronousTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerInAsynchronousTest extends PulsarTestSupport {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerInAsynchronousTest.class);
+
+    private static final String TOPIC_URI_SYNCHRONOUS_FALSE = "persistent://public/default/synchronousFalse";
+
+    private static final String TOPIC_URI_SYNCHRONOUS_FALSE_THROWS_EXCEPTION =
+            "persistent://public/default/synchronousFalseThrowsException";
+
+    private static final String TOPIC_URI_SYNCHRONOUS_FALSE_MANUAL_ACK =
+            "persistent://public/default/synchronousFalseManualAck";
+
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=false")
+    private Endpoint synchronousFalse;
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE_THROWS_EXCEPTION + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=false")
+    private Endpoint synchronousFalseThrowsException;
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_FALSE_MANUAL_ACK + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=false" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
+    private Endpoint synchronousFalseManualAck;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private CountDownLatch countDownLatch;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            Processor processor = new Processor() {
+                @Override
+                public void process(final Exchange exchange) throws InterruptedException {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+                    countDownLatch.countDown();
+                    countDownLatch.await(20, TimeUnit.SECONDS);
+                }
+            };
+
+            Processor manualAckProcessor = new Processor() {
+                @Override
+                public void process(final Exchange exchange) throws PulsarClientException {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(
+                            PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                }
+            };
+
+            @Override
+            public void configure() {
+
+                from(synchronousFalse)
+                        .threads(2)
+                            .process(processor)
+                        .end()
+                        .to(to);
+
+                from(synchronousFalseThrowsException)
+                        .threads(2)
+                            .throwException(new RuntimeException("Processor throws exception."))
+                        .end()
+                        .to(to);
+
+                from(synchronousFalseManualAck)
+                        .threads(2)
+                            .process(manualAckProcessor)
+                        .end()
+                        .to(to);
+            }
+        };
+    }
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(SimpleRegistry registry) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+    }
+
+    @Test
+    public void testMessagesProcessedAsynchronously() throws Exception {
+        countDownLatch = new CountDownLatch(2);
+
+        to.expectedMessageCount(2);
+
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(TOPIC_URI_SYNCHRONOUS_FALSE).create();
+
+        producer.send("One");
+        producer.send("Two");
+
+        MockEndpoint.assertIsSatisfied(2, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testMessageProcessedAsynchronouslyThrowsException() throws Exception {
+        throwsException(TOPIC_URI_SYNCHRONOUS_FALSE_THROWS_EXCEPTION);
+    }
+
+    public void throwsException(String topic) throws Exception {
+        to.expectedMessageCount(0);
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(topic).create();
+
+        producer.send("One");
+
+        MockEndpoint.assertIsSatisfied(2, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testMessagesProcessedAsynchronouslyManualAcknowledge() throws Exception {
+        manualAcknowledgement(TOPIC_URI_SYNCHRONOUS_FALSE_MANUAL_ACK);
+    }
+
+    public void manualAcknowledgement(String topic) throws Exception {
+        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(1);
+
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(topic).create();
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java
new file mode 100644
index 0000000..0e154fd
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInSynchronousTest.java
@@ -0,0 +1,199 @@
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerInSynchronousTest extends PulsarTestSupport {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerInAsynchronousTest.class);
+
+    private static final String TOPIC_URI_SYNCHRONOUS_TRUE = "persistent://public/default/synchronousTrue";
+    private static final String TOPIC_URI_SYNCHRONOUS_DEFAULT = "persistent://public/default/synchronousDefault";
+
+    private static final String TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION =
+            "persistent://public/default/synchronousTrueThrowsException";
+
+    private static final String TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK =
+            "persistent://public/default/synchronousTrueManualAck";
+
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=true")
+    private Endpoint synchronousTrue;
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_DEFAULT + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
+    private Endpoint synchronousDefault;
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=true")
+    private Endpoint synchronousTrueThrowsException;
+
+    @EndpointInject("pulsar:" + TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+            + "&synchronous=true" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
+    private Endpoint synchronousTrueManualAck;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private CountDownLatch countDownLatch;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            Processor processor = new Processor() {
+                @Override
+                public void process(final Exchange exchange) throws InterruptedException {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+                    countDownLatch.countDown();
+                    countDownLatch.await(20, TimeUnit.SECONDS);
+                }
+            };
+
+            Processor manualAckProcessor = new Processor() {
+                @Override
+                public void process(final Exchange exchange) throws PulsarClientException {
+                    LOGGER.info("Processing message {}", exchange.getIn().getBody());
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(
+                            PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                }
+            };
+
+            @Override
+            public void configure() {
+
+                from(synchronousTrue)
+                        .threads(2)
+                        .process(processor)
+                        .end()
+                        .to(to);
+
+                from(synchronousDefault)
+                        .threads(2)
+                        .process(processor)
+                        .end()
+                        .to(to);
+
+                from(synchronousTrueThrowsException)
+                        .threads(2)
+                        .throwException(new RuntimeException("Processor throws exception."))
+                        .end()
+                        .to(to);
+
+                from(synchronousTrueManualAck)
+                        .threads(2)
+                        .process(manualAckProcessor)
+                        .end()
+                        .to(to);
+            }
+        };
+    }
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(SimpleRegistry registry) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+    }
+
+    @Test
+    public void testMessagesProcessedSynchronously() throws Exception {
+        processSynchronously(TOPIC_URI_SYNCHRONOUS_TRUE);
+    }
+
+    @Test
+    public void testMessagesProcessedSynchronouslyByDefault() throws Exception {
+        processSynchronously(TOPIC_URI_SYNCHRONOUS_DEFAULT);
+    }
+
+    public void processSynchronously(String topic) throws Exception {
+
+        to.expectedMessageCount(2);
+
+        countDownLatch = new CountDownLatch(2);
+
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(topic).create();
+
+        producer.send("One");
+        producer.send("Two");
+
+        to.assertIsNotSatisfied(2000L); // ms
+
+    }
+
+    @Test
+    public void testMessageProcessedSynchronouslyThrowsException() throws Exception {
+        throwsException(TOPIC_URI_SYNCHRONOUS_TRUE_THROWS_EXCEPTION);
+    }
+
+    public void throwsException(String topic) throws Exception {
+        to.expectedMessageCount(0);
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(topic).create();
+
+        producer.send("One");
+
+        MockEndpoint.assertIsSatisfied(2, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testMessagesProcessedSynchronouslyManualAcknowledge() throws Exception {
+        manualAcknowledgement(TOPIC_URI_SYNCHRONOUS_TRUE_MANUAL_ACK);
+    }
+
+    public void manualAcknowledgement(String topic) throws Exception {
+        to.expectsNoDuplicates(body());
+        to.expectedMessageCount(1);
+
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER)
+                .topic(topic).create();
+
+        producer.send("Hello World!");
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
index 6173eef..5881705 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
@@ -92,11 +92,11 @@ public interface PulsarComponentBuilderFactory {
         }
         /**
          * Whether to allow manual message acknowledgements. If this option is
-         * enabled, then messages are not immediately acknowledged after being
-         * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
-         * header on the org.apache.camel.Exchange. Messages can then be
-         * acknowledged using PulsarMessageReceipt at any time before the
-         * ackTimeout occurs.
+         * enabled, then messages are not acknowledged automatically after
+         * successful route completion. Instead, an instance of
+         * PulsarMessageReceipt is stored as a header on the
+         * org.apache.camel.Exchange. Messages can then be acknowledged using
+         * PulsarMessageReceipt at any time before the ackTimeout occurs.
          * 
          * The option is a: <code>boolean</code> type.
          * 
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 0c2d9d8..d5e78fe 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -99,11 +99,11 @@ public interface PulsarEndpointBuilderFactory {
         }
         /**
          * Whether to allow manual message acknowledgements. If this option is
-         * enabled, then messages are not immediately acknowledged after being
-         * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
-         * header on the org.apache.camel.Exchange. Messages can then be
-         * acknowledged using PulsarMessageReceipt at any time before the
-         * ackTimeout occurs.
+         * enabled, then messages are not acknowledged automatically after
+         * successful route completion. Instead, an instance of
+         * PulsarMessageReceipt is stored as a header on the
+         * org.apache.camel.Exchange. Messages can then be acknowledged using
+         * PulsarMessageReceipt at any time before the ackTimeout occurs.
          * 
          * The option is a: <code>boolean</code> type.
          * 
@@ -117,11 +117,11 @@ public interface PulsarEndpointBuilderFactory {
         }
         /**
          * Whether to allow manual message acknowledgements. If this option is
-         * enabled, then messages are not immediately acknowledged after being
-         * consumed. Instead, an instance of PulsarMessageReceipt is stored as a
-         * header on the org.apache.camel.Exchange. Messages can then be
-         * acknowledged using PulsarMessageReceipt at any time before the
-         * ackTimeout occurs.
+         * enabled, then messages are not acknowledged automatically after
+         * successful route completion. Instead, an instance of
+         * PulsarMessageReceipt is stored as a header on the
+         * org.apache.camel.Exchange. Messages can then be acknowledged using
+         * PulsarMessageReceipt at any time before the ackTimeout occurs.
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
@@ -493,7 +493,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option is a: <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointConsumerBuilder synchronous(
@@ -507,7 +507,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointConsumerBuilder synchronous(
@@ -927,7 +927,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option is a: <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointProducerBuilder synchronous(
@@ -941,7 +941,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointProducerBuilder synchronous(
@@ -1007,7 +1007,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option is a: <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointBuilder synchronous(boolean synchronous) {
@@ -1020,7 +1020,7 @@ public interface PulsarEndpointBuilderFactory {
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
-         * Default: false
+         * Default: true
          * Group: advanced
          */
         default AdvancedPulsarEndpointBuilder synchronous(String synchronous) {
diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc b/docs/components/modules/ROOT/pages/pulsar-component.adoc
index 32a0648..0ddc816 100644
--- a/docs/components/modules/ROOT/pages/pulsar-component.adoc
+++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc
@@ -42,7 +42,7 @@ The Pulsar component supports 32 options, which are listed below.
 | *configuration* (common) | Allows to pre-configure the Pulsar component with common options that the endpoints will reuse. |  | PulsarConfiguration
 | *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long
 | *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long
-| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
+| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not acknowledged automatically after successful route completion. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
@@ -108,7 +108,7 @@ with the following path and query parameters:
 | Name | Description | Default | Type
 | *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long
 | *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long
-| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
+| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not acknowledged automatically after successful route completion. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
@@ -136,7 +136,7 @@ with the following path and query parameters:
 | *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. |  | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
-| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | true | boolean
 |===
 // endpoint options: END