You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/19 20:47:54 UTC

[pulsar] branch master updated: Introduce batch message container framework and support key based batching container (#4435)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b45736a  Introduce batch message container framework and support key based batching container (#4435)
b45736a is described below

commit b45736ad0738116c9c3cae27ed18f1342b55139e
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 20 04:47:48 2019 +0800

    Introduce batch message container framework and support key based batching container (#4435)
    
    ### Motivation
    
    Introduce batch message container framework to support multiple ways to do message batch.
    Currently, pulsar support a most basic batch message container, use the batch message container framework can quickly implement other types batch message container, even users can customize their own batch message container.
    
    Add a new batch message container named BatchMessageKeyBasedContainer to support batching message in key_shared subscription mode.
---
 .../pulsar/broker/service/BatchMessageTest.java    | 163 ++++++++++++----
 .../client/api/KeySharedSubscriptionTest.java      | 167 ++++++----------
 .../pulsar/client/api/BatchMessageContainer.java   |  64 ++++++
 .../apache/pulsar/client/api/BatcherBuilder.java   |  56 ++++++
 .../apache/pulsar/client/api/ProducerBuilder.java  |  12 +-
 .../client/internal/DefaultImplementation.java     |  13 ++
 .../client/impl/AbstractBatchMessageContainer.java |  87 +++++++++
 .../client/impl/BatchMessageContainerBase.java     |  68 +++++++
 ...ntainer.java => BatchMessageContainerImpl.java} | 143 +++++++-------
 .../client/impl/BatchMessageKeyBasedContainer.java | 217 +++++++++++++++++++++
 .../pulsar/client/impl/DefaultBatcherBuilder.java  |  30 +++
 .../pulsar/client/impl/KeyBasedBatcherBuilder.java |  30 +++
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   8 +
 .../apache/pulsar/client/impl/ProducerImpl.java    | 123 +++++-------
 .../impl/conf/ProducerConfigurationData.java       |   3 +-
 .../impl/conf/ConfigurationDataUtilsTest.java      |   1 +
 .../apache/pulsar/common/protocol/Commands.java    |   3 +
 .../java/org/apache/pulsar/storm/PulsarBolt.java   |   1 +
 18 files changed, 914 insertions(+), 275 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 516197d..265e3ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 
 import com.google.common.collect.Lists;
 
@@ -30,8 +29,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -39,12 +40,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -69,16 +72,31 @@ public class BatchMessageTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
-    @DataProvider(name = "codec")
-    public Object[][] codecProvider() {
-        return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, };
+    @DataProvider(name = "codecAndContainerBuilder")
+    public Object[][] codecAndContainerBuilderProvider() {
+        return new Object[][] {
+                { CompressionType.NONE, BatcherBuilder.DEFAULT },
+                { CompressionType.LZ4, BatcherBuilder.DEFAULT },
+                { CompressionType.ZLIB, BatcherBuilder.DEFAULT },
+                { CompressionType.NONE, BatcherBuilder.KEY_BASED },
+                { CompressionType.LZ4, BatcherBuilder.KEY_BASED },
+                { CompressionType.ZLIB, BatcherBuilder.KEY_BASED }
+        };
     }
 
-    @Test(dataProvider = "codec")
-    public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType) throws Exception {
+    @DataProvider(name = "containerBuilder")
+    public Object[][] containerBuilderProvider() {
+        return new Object[][] {
+                { BatcherBuilder.DEFAULT },
+                { BatcherBuilder.KEY_BASED }
+        };
+    }
+
+    @Test(dataProvider = "codecAndContainerBuilder")
+    public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
         final String subscriptionName = "sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -87,6 +105,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -117,10 +136,10 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test(dataProvider = "codec")
-    public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType) throws Exception {
+    @Test(dataProvider = "codecAndContainerBuilder")
+    public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 100;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-" + UUID.randomUUID();
         final String subscriptionName = "time-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -128,7 +147,9 @@ public class BatchMessageTest extends BrokerTestBase {
         consumer.close();
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
-                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true).create();
+                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true)
+                .batcherBuilder(builder)
+                .create();
 
         Random random = new Random();
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -151,10 +172,10 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test(dataProvider = "codec")
-    public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType) throws Exception {
+    @Test(dataProvider = "codecAndContainerBuilder")
+    public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 100;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-" + UUID.randomUUID();
         final String subscriptionName = "time-size-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -163,6 +184,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).batchingMaxMessages(5)
+                .batcherBuilder(builder)
                 .compressionType(compressionType).enableBatching(true).create();
 
         Random random = new Random();
@@ -186,11 +208,11 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test(dataProvider = "codec")
-    public void testBatchProducerWithLargeMessage(CompressionType compressionType) throws Exception {
+    @Test(dataProvider = "codecAndContainerBuilder")
+    public void testBatchProducerWithLargeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage";
+        final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" + UUID.randomUUID();
         final String subscriptionName = "large-message-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -199,6 +221,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -238,11 +261,11 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test(dataProvider = "codec")
-    public void testSimpleBatchProducerConsumer(CompressionType compressionType) throws Exception {
+    @Test(dataProvider = "codecAndContainerBuilder")
+    public void testSimpleBatchProducerConsumer(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 500;
         int numMsgsInBatch = numMsgs / 20;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
         final String subscriptionName = "pc-sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -257,6 +280,7 @@ public class BatchMessageTest extends BrokerTestBase {
             // disabled size based batch
             .batchingMaxMessages(2 * numMsgs)
             .enableBatching(true)
+            .batcherBuilder(builder)
             .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -296,11 +320,11 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test
-    public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception {
+    @Test(dataProvider = "containerBuilder")
+    public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder) throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-" + UUID.randomUUID();
         final String subscriptionName = "syncsub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -309,6 +333,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
 
         for (int i = 0; i < numMsgs; i++) {
@@ -338,11 +363,11 @@ public class BatchMessageTest extends BrokerTestBase {
 
     }
 
-    @Test
-    public void testSimpleBatchProducerConsumer1kMessages() throws Exception {
+    @Test(dataProvider = "containerBuilder")
+    public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) throws Exception {
         int numMsgs = 2000;
         int numMsgsInBatch = 4;
-        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages";
+        final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-" + UUID.randomUUID();
         final String subscriptionName = "pc1k-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -351,6 +376,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1)
                 .batchingMaxPublishDelay(30, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -465,11 +491,11 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
-    @Test
-    public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
+    @Test(dataProvider = "containerBuilder")
+    public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs;
-        final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish";
+        final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-" + UUID.randomUUID();
         final String subscriptionName = "nbcaabp-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -478,6 +504,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
         // create producer to publish non batch messages
         Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName).create();
@@ -517,11 +544,11 @@ public class BatchMessageTest extends BrokerTestBase {
         noBatchProducer.close();
     }
 
-    @Test
-    public void testBatchAndNonBatchCumulativeAcks() throws Exception {
+    @Test(dataProvider = "containerBuilder")
+    public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 10;
-        final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks";
+        final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" + UUID.randomUUID();
         final String subscriptionName = "bnb-sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -532,6 +559,7 @@ public class BatchMessageTest extends BrokerTestBase {
             .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
             .batchingMaxMessages(numMsgsInBatch)
             .enableBatching(true)
+            .batcherBuilder(builder)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         // create producer to publish non batch messages
@@ -590,10 +618,10 @@ public class BatchMessageTest extends BrokerTestBase {
      *
      * @throws Exception
      */
-    @Test(timeOut = 3000)
-    public void testConcurrentBatchMessageAck() throws Exception {
+    @Test(dataProvider = "containerBuilder", timeOut = 3000)
+    public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Exception {
         int numMsgs = 10;
-        final String topicName = "persistent://prop/ns-abc/testConcurrentAck";
+        final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" + UUID.randomUUID();
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -602,6 +630,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+                .batcherBuilder(builder)
                 .create();
 
         List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -644,5 +673,69 @@ public class BatchMessageTest extends BrokerTestBase {
         producer.close();
     }
 
+    @Test
+    public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topicName = "persistent://prop/ns-abc/testKeyBased";
+        final String subscriptionName = "sub-1";
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+                .batchingMaxMessages(30)
+                .enableBatching(true)
+                .batcherBuilder(BatcherBuilder.KEY_BASED)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+        String[] keys = new String[]{"key-1", "key-2", "key-3"};
+        for (int i = 0; i < 10; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            for (String key : keys) {
+                sendFutureList.add(producer.newMessage().key(key).value(message).sendAsync());
+            }
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+
+        for (int i = 0; i < 30; i++) {
+            Message<byte[]> received = consumer.receive();
+            if (i < 10) {
+                assertEquals(received.getKey(), "key-1");
+            } else if (i < 20) {
+                assertEquals(received.getKey(), "key-2");
+            } else {
+                assertEquals(received.getKey(), "key-3");
+            }
+            consumer.acknowledge(received);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            for (String key : keys) {
+                sendFutureList.add(producer.newMessage()
+                        .key(UUID.randomUUID().toString())
+                        .orderingKey(key.getBytes())
+                        .value(message)
+                        .sendAsync());
+            }
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+
+        for (int i = 0; i < 30; i++) {
+            Message<byte[]> received = consumer.receive();
+            if (i < 10) {
+                assertEquals(new String(received.getOrderingKey()), "key-1");
+            } else if (i < 20) {
+                assertEquals(new String(received.getOrderingKey()), "key-2");
+            } else {
+                assertEquals(new String(received.getOrderingKey()), "key-3");
+            }
+            consumer.acknowledge(received);
+        }
+
+        consumer.close();
+        producer.close();
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index a6f357e..494a8c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -37,6 +38,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertTrue;
@@ -46,6 +48,13 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
     private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
 
+    @DataProvider(name = "batch")
+    public Object[][] batchProvider() {
+        return new Object[][] {
+                { false },
+                { true }
+        };
+    }
 
     @BeforeMethod
     @Override
@@ -60,40 +69,22 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
-    public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+    @Test(dataProvider = "batch")
+    public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared";
+        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
 
         @Cleanup
-        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer1 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer2 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer3 = createConsumer(topic);
 
         @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-                .topic(topic)
-                .enableBatching(false)
-                .create();
+        Producer<Integer> producer = createProducer(topic, enableBatch);
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
@@ -129,41 +120,23 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         receiveAndCheck(checkList);
     }
 
-    @Test
-    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
+    @Test(dataProvider = "batch")
+    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException, InterruptedException {
 
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared_consumer_crash";
+        String topic = "persistent://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
 
         @Cleanup
-        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer1 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer2 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer3 = createConsumer(topic);
 
         @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-            .topic(topic)
-            .enableBatching(false)
-            .create();
+        Producer<Integer> producer = createProducer(topic, enableBatch);
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
@@ -219,45 +192,27 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
 
-    @Test
-    public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+    @Test(dataProvider = "batch")
+    public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared_none_key";
+        String topic = "persistent://public/default/key_shared_none_key-" + UUID.randomUUID();
 
         @Cleanup
-        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer1 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer2 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS)
-                .subscribe();
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, enableBatch);
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
         int consumer3Slot = consumer2Slot >> 1;
 
-        @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-
         for (int i = 0; i < 100; i++) {
             producer.newMessage()
                     .value(i)
@@ -276,40 +231,22 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         receiveAndCheck(checkList);
     }
 
-    @Test
-    public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+    @Test(dataProvider = "batch")
+    public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared_ordering_key";
+        String topic = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();
 
         @Cleanup
-        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
-            .topic(topic)
-            .subscriptionName("key_shared")
-            .subscriptionType(SubscriptionType.Key_Shared)
-            .ackTimeout(3, TimeUnit.SECONDS)
-            .subscribe();
+        Consumer<Integer> consumer1 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
-            .topic(topic)
-            .subscriptionName("key_shared")
-            .subscriptionType(SubscriptionType.Key_Shared)
-            .ackTimeout(3, TimeUnit.SECONDS)
-            .subscribe();
+        Consumer<Integer> consumer2 = createConsumer(topic);
 
         @Cleanup
-        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
-            .topic(topic)
-            .subscriptionName("key_shared")
-            .subscriptionType(SubscriptionType.Key_Shared)
-            .ackTimeout(3, TimeUnit.SECONDS)
-            .subscribe();
+        Consumer<Integer> consumer3 = createConsumer(topic);
 
         @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-            .topic(topic)
-            .enableBatching(false)
-            .create();
+        Producer<Integer> producer = createProducer(topic, enableBatch);
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
@@ -358,6 +295,32 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
             .subscribe();
     }
 
+    private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
+        Producer<Integer> producer = null;
+        if (enableBatch) {
+            producer = pulsarClient.newProducer(Schema.INT32)
+                    .topic(topic)
+                    .enableBatching(true)
+                    .batcherBuilder(BatcherBuilder.KEY_BASED)
+                    .create();
+        } else {
+            producer = pulsarClient.newProducer(Schema.INT32)
+                    .topic(topic)
+                    .enableBatching(false)
+                    .create();
+        }
+        return producer;
+    }
+
+    private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
+        return pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .subscribe();
+    }
+
     private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
         Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
         for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java
new file mode 100644
index 0000000..cf72962
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Batch message container for individual messages being published until they are batched and sent to broker
+ */
+public interface BatchMessageContainer {
+
+    /**
+     * Clear the message batch container.
+     */
+    void clear();
+
+    /**
+     * Check the message batch container is empty.
+     *
+     * @return return true if empty, otherwise return false.
+     */
+    boolean isEmpty();
+
+    /**
+     * Get count of messages in the message batch container.
+     *
+     * @return messages count
+     */
+    int getNumMessagesInBatch();
+
+    /**
+     * Get current message batch size of the message batch container in bytes.
+     *
+     * @return message batch size in bytes
+     */
+    long getCurrentBatchSize();
+
+    /**
+     * Release the payload and clear the container.
+     *
+     * @param ex cause
+     */
+    void discard(Exception ex);
+
+    /**
+     * Return the batch container batch message in multiple batches
+     * @return
+     */
+    boolean isMultiBatches();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java
new file mode 100644
index 0000000..705a0ac
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * Batcher builder
+ */
+public interface BatcherBuilder {
+
+    /**
+     * Default batch message container
+     *
+     * incoming single messages:
+     * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+     *
+     * batched into single batch message:
+     * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
+     */
+    BatcherBuilder DEFAULT = DefaultImplementation.newDefaultBatcherBuilder();
+
+    /**
+     * Key based batch message container
+     *
+     * incoming single messages:
+     * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+     *
+     * batched into multiple batch messages:
+     * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
+     */
+    BatcherBuilder KEY_BASED = DefaultImplementation.newKeyBasedBatcherBuilder();
+
+    /**
+     * Build a new batch message container.
+     * @return new batch message container
+     */
+    BatchMessageContainer build();
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index a98036c..2e1a858 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -22,8 +22,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MaximizeAction;
-
 import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 
 /**
@@ -366,6 +364,16 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
 
     /**
+     * Set the batcher builder {@link BatcherBuilder} of the producer. Producer will use the batcher builder to
+     * build a batch message container.This is only be used when batching is enabled
+     *
+     * @param batcherBuilder
+     *          batcher builder
+     * @return the producer builder instance
+     */
+    ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);
+
+    /**
      * Set the baseline for the sequence ids for messages published by the producer.
      * <p>
      * First message will be using {@code (initialSequenceId + 1)} as its sequence id and subsequent messages will be assigned
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index d85b278..80282bf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -36,6 +36,7 @@ import java.util.function.Supplier;
 import lombok.experimental.UtilityClass;
 
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
@@ -282,4 +283,16 @@ public class DefaultImplementation {
                 () -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl",
                         String.class).newInstance(name));
     }
+
+    public static BatcherBuilder newDefaultBatcherBuilder() {
+        return catchExceptions(
+            () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
+                    .newInstance());
+    }
+
+    public static BatcherBuilder newKeyBasedBatcherBuilder() {
+        return catchExceptions(
+                () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
+                        .newInstance());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
new file mode 100644
index 0000000..3d6ca4a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Batch message container framework.
+ */
+public abstract class AbstractBatchMessageContainer implements BatchMessageContainerBase {
+
+    protected PulsarApi.CompressionType compressionType;
+    protected CompressionCodec compressor;
+    protected String topicName;
+    protected String producerName;
+    protected ProducerImpl producer;
+
+    protected int maxNumMessagesInBatch;
+    protected int numMessagesInBatch = 0;
+    protected long currentBatchSizeBytes = 0;
+
+    protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
+    protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;
+
+    // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
+    // allocate a new buffer that can hold the entire batch without needing costly reallocations
+    protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
+
+    @Override
+    public boolean haveEnoughSpace(MessageImpl<?> msg) {
+        int messageSize = msg.getDataBuffer().readableBytes();
+        return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
+                && numMessagesInBatch < maxNumMessagesInBatch);
+    }
+
+    @Override
+    public int getNumMessagesInBatch() {
+        return numMessagesInBatch;
+    }
+
+    @Override
+    public long getCurrentBatchSize() {
+        return currentBatchSizeBytes;
+    }
+
+    @Override
+    public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ProducerImpl.OpSendMsg createOpSendMsg() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setProducer(ProducerImpl<?> producer) {
+        this.producer = producer;
+        this.topicName = producer.getTopic();
+        this.producerName = producer.getProducerName();
+        this.compressionType = CompressionCodecProvider
+                .convertToWireProtocol(producer.getConfiguration().getCompressionType());
+        this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
+        this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
new file mode 100644
index 0000000..5f930bd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface BatchMessageContainerBase extends BatchMessageContainer {
+
+    /**
+     * Add message to the batch message container.
+     *
+     * @param msg message will add to the batch message container
+     * @param callback message send callback
+     */
+    void add(MessageImpl<?> msg, SendCallback callback);
+
+    /**
+     * Check the batch message container have enough space for the message want to add.
+     *
+     * @param msg the message want to add
+     * @return return true if the container have enough space for the specific message,
+     *         otherwise return false.
+     */
+    boolean haveEnoughSpace(MessageImpl<?> msg);
+
+    /**
+     * Set producer of the message batch container.
+     *
+     * @param producer producer
+     */
+    void setProducer(ProducerImpl<?> producer);
+
+    /**
+     * Create list of OpSendMsg, producer use OpSendMsg to send to the broker.
+     *
+     * @return list of OpSendMsg
+     * @throws IOException
+     */
+    List<OpSendMsg> createOpSendMsgs() throws IOException;
+
+    /**
+     * Create OpSendMsg, producer use OpSendMsg to send to the broker.
+     *
+     * @return OpSendMsg
+     * @throws IOException
+     */
+    OpSendMsg createOpSendMsg() throws IOException;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
similarity index 52%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 0e1adee..b293572 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -22,63 +22,42 @@ import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.IOException;
 import java.util.List;
 
+import io.netty.util.ReferenceCountUtil;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.protocol.Commands;
+
 import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * container for individual messages being published until they are batched and sent to broker
+ * Default batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into single batch message:
+ * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
  */
+class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
 
-class BatchMessageContainer {
-
-    private SendCallback previousCallback = null;
-    private final PulsarApi.CompressionType compressionType;
-    private final CompressionCodec compressor;
-    private final String topicName;
-    private final String producerName;
-
-    final int maxNumMessagesInBatch;
-
-    PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
-    int numMessagesInBatch = 0;
-    long currentBatchSizeBytes = 0;
+    private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
     // sequence id for this batch which will be persisted as a single entry by broker
-    long sequenceId = -1;
-    ByteBuf batchedMessageMetadataAndPayload;
-    List<MessageImpl<?>> messages = Lists.newArrayList();
+    private long sequenceId = -1;
+    private ByteBuf batchedMessageMetadataAndPayload;
+    private List<MessageImpl<?>> messages = Lists.newArrayList();
+    protected SendCallback previousCallback = null;
     // keep track of callbacks for individual messages being published in a batch
-    SendCallback firstCallback;
-
-    private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
-    protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;
-
-    // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
-    // allocate a new buffer that can hold the entire batch without needing costly reallocations
-    private int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
-
-    BatchMessageContainer(int maxNumMessagesInBatch, PulsarApi.CompressionType compressionType, String topicName,
-            String producerName) {
-        this.maxNumMessagesInBatch = maxNumMessagesInBatch;
-        this.compressionType = compressionType;
-        this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
-        this.topicName = topicName;
-        this.producerName = producerName;
-    }
-
-    boolean hasSpaceInBatch(MessageImpl<?> msg) {
-        int messageSize = msg.getDataBuffer().readableBytes();
-        return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
-                && numMessagesInBatch < maxNumMessagesInBatch);
-    }
+    protected SendCallback firstCallback;
 
-    void add(MessageImpl<?> msg, SendCallback callback) {
+    @Override
+    public void add(MessageImpl<?> msg, SendCallback callback) {
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
@@ -98,16 +77,17 @@ class BatchMessageContainer {
             previousCallback.addCallback(msg, callback);
         }
         previousCallback = callback;
-
         currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
-        PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
-        batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
-                msg.getDataBuffer(), batchedMessageMetadataAndPayload);
         messages.add(msg);
-        msgBuilder.recycle();
     }
 
-    ByteBuf getCompressedBatchMetadataAndPayload() {
+    private ByteBuf getCompressedBatchMetadataAndPayload() {
+        for (MessageImpl<?> msg : messages) {
+            PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
+            batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+                    msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+            msgBuilder.recycle();
+        }
         int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
         ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
         batchedMessageMetadataAndPayload.release();
@@ -122,20 +102,8 @@ class BatchMessageContainer {
         return compressedPayload;
     }
 
-    PulsarApi.MessageMetadata setBatchAndBuild() {
-        messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] num messages in batch being closed are {}", topicName, producerName,
-                    numMessagesInBatch);
-        }
-        return messageMetadata.build();
-    }
-
-    ByteBuf getBatchedSingleMessageMetadataAndPayload() {
-        return batchedMessageMetadataAndPayload;
-    }
-
-    void clear() {
+    @Override
+    public void clear() {
         messages = Lists.newArrayList();
         firstCallback = null;
         previousCallback = null;
@@ -146,9 +114,52 @@ class BatchMessageContainer {
         batchedMessageMetadataAndPayload = null;
     }
 
-    boolean isEmpty() {
+    @Override
+    public boolean isEmpty() {
         return messages.isEmpty();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(BatchMessageContainer.class);
+    @Override
+    public void discard(Exception ex) {
+        try {
+            // Need to protect ourselves from any exception being thrown in the future handler from the application
+            firstCallback.sendComplete(ex);
+        } catch (Throwable t) {
+            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
+                sequenceId, t);
+        }
+        ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+        clear();
+    }
+
+    @Override
+    public boolean isMultiBatches() {
+        return false;
+    }
+
+    @Override
+    public OpSendMsg createOpSendMsg() throws IOException {
+        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
+        messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        ByteBufPair cmd = producer.sendMessage(producer.producerId, sequenceId, numMessagesInBatch,
+            messageMetadata.build(), encryptedPayload);
+
+        OpSendMsg op = OpSendMsg.create(messages, cmd, sequenceId, firstCallback);
+
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            cmd.release();
+            if (op != null) {
+                op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+                op.recycle();
+            }
+            return null;
+        }
+
+        op.setNumMessagesInBatch(numMessagesInBatch);
+        op.setBatchSizeByte(currentBatchSizeBytes);
+        return op;
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(BatchMessageContainerImpl.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
new file mode 100644
index 0000000..a43afda
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Key based batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into multiple batch messages:
+ * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
+ */
+class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
+
+    private Map<String, KeyedBatch> batches = new HashMap<>();
+
+    @Override
+    public void add(MessageImpl<?> msg, SendCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
+                    numMessagesInBatch);
+        }
+        numMessagesInBatch++;
+        currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
+        String key = getKey(msg);
+        KeyedBatch part = batches.get(key);
+        if (part == null) {
+            part = new KeyedBatch();
+            part.addMsg(msg, callback);
+            part.compressionType = compressionType;
+            part.compressor = compressor;
+            part.maxBatchSize = maxBatchSize;
+            batches.putIfAbsent(key, part);
+        } else {
+            part.addMsg(msg, callback);
+        }
+    }
+
+    @Override
+    public void clear() {
+        numMessagesInBatch = 0;
+        currentBatchSizeBytes = 0;
+        batches = new HashMap<>();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return batches.isEmpty();
+    }
+
+    @Override
+    public void discard(Exception ex) {
+        try {
+            // Need to protect ourselves from any exception being thrown in the future handler from the application
+            batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
+        } catch (Throwable t) {
+            log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t);
+        }
+        batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
+        clear();
+    }
+
+    @Override
+    public boolean isMultiBatches() {
+        return true;
+    }
+
+    private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
+        ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
+        final int numMessagesInBatch = keyedBatch.messages.size();
+        long currentBatchSizeBytes = 0;
+        for (MessageImpl<?> message : keyedBatch.messages) {
+            currentBatchSizeBytes += message.getDataBuffer().readableBytes();
+        }
+        keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
+                keyedBatch.messageMetadata.build(), encryptedPayload);
+
+        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
+
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            cmd.release();
+            if (op != null) {
+                op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+                        "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+                op.recycle();
+            }
+            return null;
+        }
+        op.setNumMessagesInBatch(numMessagesInBatch);
+        op.setBatchSizeByte(currentBatchSizeBytes);
+        return op;
+    }
+
+    @Override
+    public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
+        List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
+        List<KeyedBatch> list = new ArrayList<>(batches.values());
+        list.sort(((o1, o2) -> ComparisonChain.start()
+                .compare(o1.sequenceId, o2.sequenceId)
+                .result()));
+        for (KeyedBatch keyedBatch : list) {
+            ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
+            if (op != null) {
+                result.add(op);
+            }
+        }
+        return result;
+    }
+
+    private String getKey(MessageImpl<?> msg) {
+        if (msg.hasOrderingKey()) {
+            return Base64.getEncoder().encodeToString(msg.getOrderingKey());
+        }
+        return msg.getKey();
+    }
+
+    private static class KeyedBatch {
+        private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
+        // sequence id for this batch which will be persisted as a single entry by broker
+        private long sequenceId = -1;
+        private ByteBuf batchedMessageMetadataAndPayload;
+        private List<MessageImpl<?>> messages = Lists.newArrayList();
+        private SendCallback previousCallback = null;
+        private PulsarApi.CompressionType compressionType;
+        private CompressionCodec compressor;
+        private int maxBatchSize;
+
+        // keep track of callbacks for individual messages being published in a batch
+        private SendCallback firstCallback;
+
+        private ByteBuf getCompressedBatchMetadataAndPayload() {
+            for (MessageImpl<?> msg : messages) {
+                PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
+                batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+                msgBuilder.recycle();
+            }
+            int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
+            ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
+            batchedMessageMetadataAndPayload.release();
+            if (compressionType != PulsarApi.CompressionType.NONE) {
+                messageMetadata.setCompression(compressionType);
+                messageMetadata.setUncompressedSize(uncompressedSize);
+            }
+
+            // Update the current max batch size using the uncompressed size, which is what we need in any case to
+            // accumulate the batch content
+            maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+            return compressedPayload;
+        }
+
+        private void addMsg(MessageImpl<?> msg, SendCallback callback) {
+            if (messages.size() == 0) {
+                sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
+                if (msg.hasKey()) {
+                    messageMetadata.setPartitionKey(msg.getKey());
+                    if (msg.hasBase64EncodedKey()) {
+                        messageMetadata.setPartitionKeyB64Encoded(true);
+                    }
+                }
+                if (msg.hasOrderingKey()) {
+                    messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey()));
+                }
+                batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
+                        .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
+                firstCallback = callback;
+            }
+            if (previousCallback != null) {
+                previousCallback.addCallback(msg, callback);
+            }
+            previousCallback = callback;
+            messages.add(msg);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java
new file mode 100644
index 0000000..17c2eb4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+
+public class DefaultBatcherBuilder implements BatcherBuilder {
+
+    @Override
+    public BatchMessageContainer build() {
+        return new BatchMessageContainerImpl();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java
new file mode 100644
index 0000000..8a4e09e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+
+public class KeyBasedBatcherBuilder implements BatcherBuilder {
+
+    @Override
+    public BatchMessageContainer build() {
+        return new BatchMessageKeyBasedContainer();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 888b58e..77f4dbe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.HashingScheme;
@@ -215,6 +216,13 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
+        conf.setBatcherBuilder(batcherBuilder);
+        return this;
+    }
+
+
+    @Override
     public ProducerBuilder<T> initialSequenceId(long initialSequenceId) {
         conf.setInitialSequenceId(initialSequenceId);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index ab6b5b6..48656f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -75,7 +76,7 @@ import org.slf4j.LoggerFactory;
 public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, ConnectionHandler.Connection {
 
     // Producer id, used to identify a producer within a single connection
-    private final long producerId;
+    protected final long producerId;
 
     // Variable is used through the atomic updater
     private volatile long msgIdGenerator;
@@ -87,7 +88,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     private volatile Timeout batchMessageAndSendTimeout = null;
     private long createProducerTimeout;
     private final int maxNumMessagesInBatch;
-    private final BatchMessageContainer batchMessageContainer;
+    private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
 
     // Globally unique producer name
@@ -163,8 +164,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
         if (conf.isBatchingEnabled()) {
             this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
-            this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch,
-                    CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()), topic, producerName);
+            BatcherBuilder containerBuilder = conf.getBatcherBuilder();
+            if (containerBuilder == null) {
+                containerBuilder = BatcherBuilder.DEFAULT;
+            }
+            this.batchMessageContainer = (BatchMessageContainerBase)containerBuilder.build();
+            this.batchMessageContainer.setProducer(this);
         } else {
             this.maxNumMessagesInBatch = 1;
             this.batchMessageContainer = null;
@@ -363,12 +368,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 if (isBatchMessagingEnabled() && !msgMetadataBuilder.hasDeliverAtTime()) {
                     // handle boundary cases where message being added would exceed
                     // batch size and/or max message size
-                    if (batchMessageContainer.hasSpaceInBatch(msg)) {
+                    if (batchMessageContainer.haveEnoughSpace(msg)) {
                         batchMessageContainer.add(msg, callback);
                         lastSendFuture = callback.getFuture();
                         payload.release();
-                        if (batchMessageContainer.numMessagesInBatch == maxNumMessagesInBatch
-                                || batchMessageContainer.currentBatchSizeBytes >= BatchMessageContainer.MAX_MESSAGE_BATCH_SIZE_BYTES) {
+                        if (batchMessageContainer.getNumMessagesInBatch() == maxNumMessagesInBatch
+                                || batchMessageContainer.getCurrentBatchSize() >= BatchMessageContainerImpl.MAX_MESSAGE_BATCH_SIZE_BYTES) {
                             batchMessageAndSend();
                         }
                     } else {
@@ -425,7 +430,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private ByteBuf encryptMessage(MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload)
+    protected ByteBuf encryptMessage(MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload)
             throws PulsarClientException {
 
         ByteBuf encryptedPayload = compressedPayload;
@@ -447,7 +452,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return encryptedPayload;
     }
 
-    private ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata,
+    protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata,
             ByteBuf compressedPayload) throws IOException {
         ChecksumType checksumType;
 
@@ -1224,17 +1229,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (batchMessageContainer.isEmpty()) {
             return;
         }
-        int numMessagesInBatch = batchMessageContainer.numMessagesInBatch;
+        int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
         semaphore.release(numMessagesInBatch);
-        try {
-            // Need to protect ourselves from any exception being thrown in the future handler from the application
-            batchMessageContainer.firstCallback.sendComplete(ex);
-        } catch (Throwable t) {
-            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
-                    batchMessageContainer.sequenceId, t);
-        }
-        ReferenceCountUtil.safeRelease(batchMessageContainer.getBatchedSingleMessageMetadataAndPayload());
-        batchMessageContainer.clear();
+        batchMessageContainer.discard(ex);
     }
 
     TimerTask batchMessageAndSendTask = new TimerTask() {
@@ -1288,68 +1285,56 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     private void batchMessageAndSend() {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", topic, producerName,
-                    batchMessageContainer.numMessagesInBatch);
+                batchMessageContainer.getNumMessagesInBatch());
         }
-        OpSendMsg op = null;
-        int numMessagesInBatch = 0;
-        try {
-            if (!batchMessageContainer.isEmpty()) {
-                numMessagesInBatch = batchMessageContainer.numMessagesInBatch;
-                ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload();
-                long sequenceId = batchMessageContainer.sequenceId;
-                ByteBuf encryptedPayload = encryptMessage(batchMessageContainer.messageMetadata, compressedPayload);
-
-                ByteBufPair cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
-                        batchMessageContainer.setBatchAndBuild(), encryptedPayload);
-
-                op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId,
-                        batchMessageContainer.firstCallback);
-
-                if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
-                    cmd.release();
-                    semaphore.release(numMessagesInBatch);
-                    if (op != null) {
-                        op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
-                            "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+        if (!batchMessageContainer.isEmpty()) {
+            try {
+                if (batchMessageContainer.isMultiBatches()) {
+                    List<OpSendMsg> opSendMsgs = batchMessageContainer.createOpSendMsgs();
+                    for (OpSendMsg opSendMsg : opSendMsgs) {
+                        processOpSendMsg(opSendMsg);
                     }
-                    return;
-                }
-
-                op.setNumMessagesInBatch(batchMessageContainer.numMessagesInBatch);
-                op.setBatchSizeByte(batchMessageContainer.currentBatchSizeBytes);
-
-                batchMessageContainer.clear();
-
-                pendingMessages.put(op);
-
-                ClientCnx cnx = cnx();
-                if (isConnected()) {
-                    // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
-                    // connection is established
-                    cmd.retain();
-                    cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
-                    stats.updateNumMsgsSent(numMessagesInBatch, op.batchSizeByte);
                 } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
-                                sequenceId);
+                    OpSendMsg opSendMsg = batchMessageContainer.createOpSendMsg();
+                    if (opSendMsg != null) {
+                        processOpSendMsg(opSendMsg);
                     }
                 }
+            } catch (PulsarClientException e) {
+                Thread.currentThread().interrupt();
+                semaphore.release(batchMessageContainer.getNumMessagesInBatch());
+            } catch (Throwable t) {
+                semaphore.release(batchMessageContainer.getNumMessagesInBatch());
+                log.warn("[{}] [{}] error while create opSendMsg by batch message container -- {}", topic, producerName, t);
+            }
+        }
+    }
+
+    private void processOpSendMsg(OpSendMsg op) {
+        try {
+            batchMessageContainer.clear();
+            pendingMessages.put(op);
+            ClientCnx cnx = cnx();
+            if (isConnected()) {
+                // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
+                // connection is established
+                op.cmd.retain();
+                cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
+                stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
+                        op.sequenceId);
+                }
             }
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
-            semaphore.release(numMessagesInBatch);
+            semaphore.release(op.numMessagesInBatch);
             if (op != null) {
                 op.callback.sendComplete(new PulsarClientException(ie));
             }
-        } catch (PulsarClientException e) {
-            Thread.currentThread().interrupt();
-            semaphore.release(numMessagesInBatch);
-            if (op != null) {
-                op.callback.sendComplete(e);
-            }
         } catch (Throwable t) {
-            semaphore.release(numMessagesInBatch);
+            semaphore.release(op.numMessagesInBatch);
             log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
             if (op != null) {
                 op.callback.sendComplete(new PulsarClientException(t));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 7ec8f90..5720420 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.NoArgsConstructor;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.HashingScheme;
@@ -66,6 +66,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
     private int batchingMaxMessages = 1000;
     private boolean batchingEnabled = true; // enabled by default
+    private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
 
     @JsonIgnore
     private CryptoKeyReader cryptoKeyReader;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 626c501..bb0092f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -59,6 +59,7 @@ public class ConfigurationDataUtilsTest {
         Map<String, Object> config = new HashMap<>();
         config.put("producerName", "test-producer");
         config.put("batchingEnabled", false);
+        confData.setBatcherBuilder(null);
         confData = ConfigurationDataUtils.loadData(config, confData, ProducerConfigurationData.class);
         assertEquals("test-producer", confData.getProducerName());
         assertEquals(false, confData.isBatchingEnabled());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 1de08cf..ba801d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1158,6 +1158,9 @@ public class Commands {
             singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey())
                 .setPartitionKeyB64Encoded(msgBuilder.getPartitionKeyB64Encoded());
         }
+        if (msgBuilder.hasOrderingKey()) {
+            singleMessageMetadataBuilder = singleMessageMetadataBuilder.setOrderingKey(msgBuilder.getOrderingKey());
+        }
         if (!msgBuilder.getPropertiesList().isEmpty()) {
             singleMessageMetadataBuilder = singleMessageMetadataBuilder
                     .addAllProperties(msgBuilder.getPropertiesList());
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 8432b1c..d331ca2 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -91,6 +91,7 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
         this.producerConf = producerConf;
         this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
         this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+        this.producerConf.setBatcherBuilder(null);
     }
     
     @SuppressWarnings({ "rawtypes" })