You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/03/13 08:27:42 UTC

[camel] branch master updated: CAMEL-14692: Add Key_Shared subscription type (#3630)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 01cebf6  CAMEL-14692: Add Key_Shared subscription type (#3630)
01cebf6 is described below

commit 01cebf6792accb70743ba2dcbdfe8f4e05f5a90a
Author: William Thompson <wi...@toasttab.com>
AuthorDate: Fri Mar 13 04:27:27 2020 -0400

    CAMEL-14692: Add Key_Shared subscription type (#3630)
    
    * CAMEL-14692: Add Key_Shared subscription type
    
    * CAMEL-14692: Regenerate docs/components
---
 .../pulsar/PulsarComponentConfigurer.java          |  2 +
 .../component/pulsar/PulsarEndpointConfigurer.java |  2 +
 .../org/apache/camel/component/pulsar/pulsar.json  |  6 ++-
 .../src/main/docs/pulsar-component.adoc            | 10 ++--
 .../component/pulsar/PulsarConfiguration.java      | 20 ++++++-
 .../camel/component/pulsar/PulsarProducer.java     |  3 +-
 .../consumers/ConsumerCreationStrategyFactory.java |  2 +
 .../utils/consumers/KeySharedConsumerStrategy.java | 63 ++++++++++++++++++++++
 .../pulsar/utils/consumers/SubscriptionType.java   |  2 +-
 .../ConsumerCreationStrategyFactoryTest.java       |  9 ++++
 .../dsl/PulsarComponentBuilderFactory.java         | 19 ++++++-
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 39 ++++++++++++--
 12 files changed, 161 insertions(+), 16 deletions(-)

diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index bcb5191..5b07ccf 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -32,6 +32,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme
         case "autoConfiguration": target.setAutoConfiguration(property(camelContext, org.apache.camel.component.pulsar.utils.AutoConfiguration.class, value)); return true;
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "batcherbuilder":
+        case "batcherBuilder": getOrCreateConfiguration(target).setBatcherBuilder(property(camelContext, org.apache.pulsar.client.api.BatcherBuilder.class, value)); return true;
         case "batchingenabled":
         case "batchingEnabled": getOrCreateConfiguration(target).setBatchingEnabled(property(camelContext, boolean.class, value)); return true;
         case "batchingmaxmessages":
diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 6bcf208..4e4895d 100644
--- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -23,6 +23,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "allowManualAcknowledgement": target.getPulsarConfiguration().setAllowManualAcknowledgement(property(camelContext, boolean.class, value)); return true;
         case "basicpropertybinding":
         case "basicPropertyBinding": target.setBasicPropertyBinding(property(camelContext, boolean.class, value)); return true;
+        case "batcherbuilder":
+        case "batcherBuilder": target.getPulsarConfiguration().setBatcherBuilder(property(camelContext, org.apache.pulsar.client.api.BatcherBuilder.class, value)); return true;
         case "batchingenabled":
         case "batchingEnabled": target.getPulsarConfiguration().setBatchingEnabled(property(camelContext, boolean.class, value)); return true;
         case "batchingmaxmessages":
diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index c202ca0..2bdec63 100644
--- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -33,8 +33,9 @@
     "numberOfConsumers": { "kind": "property", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Number of consumers - defaults to 1" },
     "subscriptionInitialPosition": { "kind": "property", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", [...]
     "subscriptionName": { "kind": "property", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the subscription to use" },
-    "subscriptionType": { "kind": "property", "displayName": "Subscription Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [ "EXCLUSIVE", "SHARED", "FAILOVER" ], "deprecated": false, "secret": false, "defaultValue": "EXCLUSIVE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "T [...]
+    "subscriptionType": { "kind": "property", "displayName": "Subscription Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [ "EXCLUSIVE", "SHARED", "FAILOVER", "KEY_SHARED" ], "deprecated": false, "secret": false, "defaultValue": "EXCLUSIVE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "de [...]
     "pulsarMessageReceiptFactory": { "kind": "property", "displayName": "Pulsar Message Receipt Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.PulsarMessageReceiptFactory", "deprecated": false, "secret": false, "description": "Provide a factory to create an alternate implementation of PulsarMessageReceipt." },
+    "batcherBuilder": { "kind": "property", "displayName": "Batcher Builder", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.BatcherBuilder", "deprecated": false, "secret": false, "defaultValue": "DEFAULT", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Control batching method used by the producer." },
     "batchingEnabled": { "kind": "property", "displayName": "Batching Enabled", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Control whether automatic batching of messages is enabled for the producer." },
     "batchingMaxMessages": { "kind": "property", "displayName": "Batching Max Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum size to batch messages." },
     "batchingMaxPublishDelayMicros": { "kind": "property", "displayName": "Batching Max Publish Delay Micros", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum time period within which the messages sent will be batched if batchingEna [...]
@@ -70,9 +71,10 @@
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
     "subscriptionInitialPosition": { "kind": "parameter", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfigur [...]
     "subscriptionName": { "kind": "parameter", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the subscription to use" },
-    "subscriptionType": { "kind": "parameter", "displayName": "Subscription Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [ "EXCLUSIVE", "SHARED", "FAILOVER" ], "deprecated": false, "secret": false, "defaultValue": "EXCLUSIVE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "descripti [...]
+    "subscriptionType": { "kind": "parameter", "displayName": "Subscription Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [ "EXCLUSIVE", "SHARED", "FAILOVER", "KEY_SHARED" ], "deprecated": false, "secret": false, "defaultValue": "EXCLUSIVE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguratio [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "batcherBuilder": { "kind": "parameter", "displayName": "Batcher Builder", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.BatcherBuilder", "deprecated": false, "secret": false, "defaultValue": "DEFAULT", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Control batching method used by the producer." },
     "batchingEnabled": { "kind": "parameter", "displayName": "Batching Enabled", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "true", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Control whether automatic batching of messages is enabled for the producer." },
     "batchingMaxMessages": { "kind": "parameter", "displayName": "Batching Max Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum size to batch messages." },
     "batchingMaxPublishDelayMicros": { "kind": "parameter", "displayName": "Batching Max Publish Delay Micros", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "1000", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum time period within which the messages sent will be batched if batc [...]
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index e9de8d2..8f78530 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -31,7 +31,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
 
 
 // component options: START
-The Pulsar component supports 32 options, which are listed below.
+The Pulsar component supports 33 options, which are listed below.
 
 
 
@@ -52,8 +52,9 @@ The Pulsar component supports 32 options, which are listed below.
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int
 | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. The value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | String
-| *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE. The value can be one of: EXCLUSIVE, SHARED, FAILOVER | EXCLUSIVE | SubscriptionType
+| *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults to EXCLUSIVE. The value can be one of: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED | EXCLUSIVE | SubscriptionType
 | *pulsarMessageReceiptFactory* (consumer) | Provide a factory to create an alternate implementation of PulsarMessageReceipt. |  | PulsarMessageReceiptFactory
+| *batcherBuilder* (producer) | Control batching method used by the producer. | DEFAULT | BatcherBuilder
 | *batchingEnabled* (producer) | Control whether automatic batching of messages is enabled for the producer. | true | boolean
 | *batchingMaxMessages* (producer) | The maximum size to batch messages. | 1000 | int
 | *batchingMaxPublishDelayMicros* (producer) | The maximum time period within which the messages sent will be batched if batchingEnabled is true. | 1000 | long
@@ -99,7 +100,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (31 parameters):
+=== Query Parameters (32 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -118,9 +119,10 @@ with the following path and query parameters:
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int
 | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. The value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | String
-| *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE. The value can be one of: EXCLUSIVE, SHARED, FAILOVER | EXCLUSIVE | SubscriptionType
+| *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults to EXCLUSIVE. The value can be one of: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED | EXCLUSIVE | SubscriptionType
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
+| *batcherBuilder* (producer) | Control batching method used by the producer. | DEFAULT | BatcherBuilder
 | *batchingEnabled* (producer) | Control whether automatic batching of messages is enabled for the producer. | true | boolean
 | *batchingMaxMessages* (producer) | The maximum size to batch messages. | 1000 | int
 | *batchingMaxPublishDelayMicros* (producer) | The maximum time period within which the messages sent will be batched if batchingEnabled is true. | 1000 | long
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index ad7d011..615e9b7 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosi
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -76,6 +77,8 @@ public class PulsarConfiguration implements Cloneable {
     private int batchingMaxMessages = 1000;
     @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
     private boolean batchingEnabled = true;
+    @UriParam(label = "producer", description = "Control batching method used by the producer.", defaultValue = "DEFAULT")
+    private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
     @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId  1.", defaultValue = "-1")
     private long initialSequenceId = -1;
     @UriParam(label = "producer", description = "Compression type to use", defaultValue = "NONE")
@@ -113,7 +116,7 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
-     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to
+     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER|KEY_SHARED], defaults to
      * EXCLUSIVE
      */
     public void setSubscriptionType(SubscriptionType subscriptionType) {
@@ -308,6 +311,21 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
+     * Control batching method of the Pulsar producer.
+     * KEY_BASED batches based on the Pulsar message key.
+     * DEFAULT batches all messages together regardless of key;
+     * this may cause only a single consumer to work when consuming using a KEY_SHARED subscription.
+     * Default is DEFAULT.
+     */
+    public void setBatcherBuilder(BatcherBuilder batcherBuilder) {
+        this.batcherBuilder = batcherBuilder;
+    }
+
+    public BatcherBuilder getBatcherBuilder() {
+        return batcherBuilder;
+    }
+
+    /**
      * Control the initial position in the topic of a newly created subscription. Default is latest message.
      */
     public void setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index c372fdbc..399ed59 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -88,7 +88,8 @@ public class PulsarProducer extends DefaultProducer {
                 .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
                 .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
                 .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
-                .enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
+                .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+                .initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
             if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
                 producerBuilder.messageRouter(configuration.getMessageRouter());
             } else {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
index 90bfd4f..fd3e2d7 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
@@ -47,6 +47,8 @@ public final class ConsumerCreationStrategyFactory {
                 return new ExclusiveConsumerStrategy(pulsarConsumer);
             case FAILOVER:
                 return new FailoverConsumerStrategy(pulsarConsumer);
+            case KEY_SHARED:
+                return new KeySharedConsumerStrategy(pulsarConsumer);
             default:
                 return new ExclusiveConsumerStrategy(pulsarConsumer);
         }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java
new file mode 100644
index 0000000..2a14f85
--- /dev/null
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar.utils.consumers;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.camel.component.pulsar.PulsarConfiguration;
+import org.apache.camel.component.pulsar.PulsarConsumer;
+import org.apache.camel.component.pulsar.PulsarEndpoint;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KeySharedConsumerStrategy implements ConsumerCreationStrategy {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KeySharedConsumerStrategy.class);
+
+    private final PulsarConsumer pulsarConsumer;
+
+    KeySharedConsumerStrategy(PulsarConsumer pulsarConsumer) {
+        this.pulsarConsumer = pulsarConsumer;
+    }
+
+    @Override
+    public Collection<Consumer<byte[]>> create(final PulsarEndpoint pulsarEndpoint) {
+        return createMultipleConsumers(pulsarEndpoint);
+    }
+
+    private Collection<Consumer<byte[]>> createMultipleConsumers(final PulsarEndpoint pulsarEndpoint) {
+        final Collection<Consumer<byte[]>> consumers = new LinkedList<>();
+        final PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+
+        for (int i = 0; i < configuration.getNumberOfConsumers(); i++) {
+            final String consumerName = configuration.getConsumerNamePrefix() + i;
+            try {
+                ConsumerBuilder<byte[]> builder = CommonCreationStrategyImpl.create(consumerName, pulsarEndpoint, pulsarConsumer);
+
+                consumers.add(builder.subscriptionType(SubscriptionType.Key_Shared).subscribe());
+            } catch (PulsarClientException exception) {
+                LOGGER.error("A PulsarClientException occurred when creating Consumer {}, {}", consumerName, exception);
+            }
+        }
+        return consumers;
+    }
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
index 5543946..800abb7 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
@@ -17,5 +17,5 @@
 package org.apache.camel.component.pulsar.utils.consumers;
 
 public enum SubscriptionType {
-    EXCLUSIVE, SHARED, FAILOVER
+    EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
index 01ba587..de45ca7 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
@@ -65,6 +65,15 @@ public class ConsumerCreationStrategyFactoryTest {
     }
 
     @Test
+    public void verifyKeySharedStrategy() {
+        ConsumerCreationStrategyFactory factory = ConsumerCreationStrategyFactory.create(mock(PulsarConsumer.class));
+
+        ConsumerCreationStrategy strategy = factory.getStrategy(SubscriptionType.KEY_SHARED);
+
+        assertEquals(KeySharedConsumerStrategy.class, strategy.getClass());
+    }
+
+    @Test
     public void verifyDefaultStrategyIsExclusiveStrategy() {
         ConsumerCreationStrategyFactory factory = ConsumerCreationStrategyFactory.create(mock(PulsarConsumer.class));
 
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
index 5881705..49920a7 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
@@ -248,8 +248,8 @@ public interface PulsarComponentBuilderFactory {
             return this;
         }
         /**
-         * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
-         * EXCLUSIVE.
+         * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+         * to EXCLUSIVE.
          * 
          * The option is a:
          * <code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code> type.
@@ -277,6 +277,20 @@ public interface PulsarComponentBuilderFactory {
             return this;
         }
         /**
+         * Control batching method used by the producer.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+         * 
+         * Default: DEFAULT
+         * Group: producer
+         */
+        default PulsarComponentBuilder batcherBuilder(
+                org.apache.pulsar.client.api.BatcherBuilder batcherBuilder) {
+            doSetProperty("batcherBuilder", batcherBuilder);
+            return this;
+        }
+        /**
          * Control whether automatic batching of messages is enabled for the
          * producer.
          * 
@@ -538,6 +552,7 @@ public interface PulsarComponentBuilderFactory {
             case "subscriptionName": getOrCreateConfiguration((PulsarComponent) component).setSubscriptionName((java.lang.String) value); return true;
             case "subscriptionType": getOrCreateConfiguration((PulsarComponent) component).setSubscriptionType((org.apache.camel.component.pulsar.utils.consumers.SubscriptionType) value); return true;
             case "pulsarMessageReceiptFactory": ((PulsarComponent) component).setPulsarMessageReceiptFactory((org.apache.camel.component.pulsar.PulsarMessageReceiptFactory) value); return true;
+            case "batcherBuilder": getOrCreateConfiguration((PulsarComponent) component).setBatcherBuilder((org.apache.pulsar.client.api.BatcherBuilder) value); return true;
             case "batchingEnabled": getOrCreateConfiguration((PulsarComponent) component).setBatchingEnabled((boolean) value); return true;
             case "batchingMaxMessages": getOrCreateConfiguration((PulsarComponent) component).setBatchingMaxMessages((int) value); return true;
             case "batchingMaxPublishDelayMicros": getOrCreateConfiguration((PulsarComponent) component).setBatchingMaxPublishDelayMicros((long) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index d5e78fe..e11c7c3 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -362,8 +362,8 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
-         * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
-         * EXCLUSIVE.
+         * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+         * to EXCLUSIVE.
          * 
          * The option is a:
          * <code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code> type.
@@ -377,8 +377,8 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
-         * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
-         * EXCLUSIVE.
+         * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+         * to EXCLUSIVE.
          * 
          * The option will be converted to a
          * <code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code> type.
@@ -527,6 +527,34 @@ public interface PulsarEndpointBuilderFactory {
             return (AdvancedPulsarEndpointProducerBuilder) this;
         }
         /**
+         * Control batching method used by the producer.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+         * 
+         * Default: DEFAULT
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batcherBuilder(
+                Object batcherBuilder) {
+            doSetProperty("batcherBuilder", batcherBuilder);
+            return this;
+        }
+        /**
+         * Control batching method used by the producer.
+         * 
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+         * 
+         * Default: DEFAULT
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batcherBuilder(
+                String batcherBuilder) {
+            doSetProperty("batcherBuilder", batcherBuilder);
+            return this;
+        }
+        /**
          * Control whether automatic batching of messages is enabled for the
          * producer.
          * 
@@ -1045,7 +1073,8 @@ public interface PulsarEndpointBuilderFactory {
     enum SubscriptionType {
         EXCLUSIVE,
         SHARED,
-        FAILOVER;
+        FAILOVER,
+        KEY_SHARED;
     }
 
     /**