You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/06/02 07:40:13 UTC

[pulsar] branch master updated: PIP 37: [pulsar-client] support large message size (#4400)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 870a637  PIP 37: [pulsar-client] support large message size (#4400)
870a637 is described below

commit 870a637b4906862a611e418341dd926e21458f08
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Jun 2 00:39:55 2020 -0700

    PIP 37: [pulsar-client] support large message size (#4400)
    
    * PIP 37: [pulsar-client] support large message size
    
    fix producer
    
    fix ref counts
    
    add timeouts
    
    add validation
    
    fix recycling
    
    fix stats
    
    review
    
    fix test
    
    fix test
    
    fix send message and expiry-consumer-config
    
    fix schema test
    
    fix chunk properties
    
    * fix test
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   4 +
 .../broker/service/AbstractBaseDispatcher.java     |   3 +
 .../org/apache/pulsar/broker/service/Consumer.java |   7 +-
 .../org/apache/pulsar/broker/service/Producer.java |  43 ++-
 .../pulsar/broker/service/SendMessageInfo.java     |   2 +
 .../apache/pulsar/broker/service/ServerCnx.java    |   5 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |   4 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   4 +-
 .../PersistentDispatcherMultipleConsumers.java     |   3 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   5 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   7 +-
 .../service/persistent/PersistentSubscription.java |   1 +
 .../broker/service/persistent/PersistentTopic.java |  12 +
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   2 +
 .../pulsar/client/impl/MessageChunkingTest.java    | 379 +++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  45 +++
 .../apache/pulsar/client/api/ProducerBuilder.java  |  27 ++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  21 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  13 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  18 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 254 ++++++++++++--
 .../client/impl/MultiTopicsConsumerImpl.java       |   5 +-
 .../pulsar/client/impl/NegativeAcksTracker.java    |   2 +
 .../PersistentAcknowledgmentsGroupingTracker.java  |  73 +++-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   9 +
 .../apache/pulsar/client/impl/ProducerImpl.java    | 237 ++++++++-----
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  14 +
 .../impl/conf/ConsumerConfigurationData.java       |   7 +
 .../impl/conf/ProducerConfigurationData.java       |   1 +
 .../impl/AcknowledgementsGroupingTrackerTest.java  |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 322 +++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |   3 +
 .../policies/data/NonPersistentPublisherStats.java |   2 +-
 .../common/policies/data/PublisherStats.java       |   3 +
 .../common/policies/data/SubscriptionStats.java    |   3 +
 .../pulsar/common/policies/data/TopicStats.java    |   3 +
 .../apache/pulsar/common/protocol/Commands.java    |   6 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 .../pulsar/testclient/PerformanceConsumer.java     |  21 +-
 .../pulsar/testclient/PerformanceProducer.java     |  13 +-
 42 files changed, 1450 insertions(+), 147 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a8ab7fd..99f49a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1931,6 +1931,10 @@ public class PersistentTopicsBase extends AdminResource {
         if (metadata.hasNullValue()) {
             responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
         }
+        if (metadata.getNumChunksFromMsg() > 0) {
+            responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
+            responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
+        }
 
         // Decode if needed
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 849d626..7985f30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -71,6 +71,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
              SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor) {
         int totalMessages = 0;
         long totalBytes = 0;
+        int totalChunkedMessages = 0;
 
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
@@ -103,6 +104,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 int batchSize = msgMetadata.getNumMessagesInBatch();
                 totalMessages += batchSize;
                 totalBytes += metadataAndPayload.readableBytes();
+                totalChunkedMessages += msgMetadata.hasChunkId() ? 1: 0;
                 batchSizes.setBatchSize(i, batchSize);
                 if (indexesAcks != null && cursor != null) {
                     long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -119,6 +121,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
 
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
+        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
     }
 
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d14ea5e..2131b18 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -87,6 +87,7 @@ public class Consumer {
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
+    private Rate chuckedMessageRate;
 
     // Represents how many messages we can safely send to the consumer without
     // overflowing its receiving queue. The consumer will use Flow commands to
@@ -146,6 +147,7 @@ public class Consumer {
         this.keySharedMeta = keySharedMeta;
         this.cnx = cnx;
         this.msgOut = new Rate();
+        this.chuckedMessageRate = new Rate();
         this.msgRedeliver = new Rate();
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
@@ -214,7 +216,7 @@ public class Consumer {
      */
 
     public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
-               int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) {
+               int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
@@ -258,6 +260,7 @@ public class Consumer {
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
         msgOutCounter.add(totalMessages);
         bytesOutCounter.add(totalBytes);
+        chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
 
         ctx.channel().eventLoop().execute(() -> {
             for (int i = 0; i < entries.size(); i++) {
@@ -505,10 +508,12 @@ public class Consumer {
 
     public void updateRates() {
         msgOut.calculateRate();
+        chuckedMessageRate.calculateRate();
         msgRedeliver.calculateRate();
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
+        stats.chuckedMessageRate = chuckedMessageRate.getRate();
     }
 
     public ConsumerStats getStats() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index eeb12e9..b19c4b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -66,6 +66,7 @@ public class Producer {
     private final long producerId;
     private final String appId;
     private Rate msgIn;
+    private Rate chuckedMessageRate;
     // it records msg-drop rate only for non-persistent topic
     private final Rate msgDrop;
     private AuthenticationDataSource authenticationData;
@@ -100,6 +101,7 @@ public class Producer {
         this.appId = appId;
         this.authenticationData = cnx.authenticationData;
         this.msgIn = new Rate();
+        this.chuckedMessageRate = new Rate();
         this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
         this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
 
@@ -136,13 +138,13 @@ public class Producer {
         return false;
     }
 
-    public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
+    public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
         beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
-        publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+        publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
-           ByteBuf headersAndPayload, long batchSize) {
+           ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
         if (lowestSequenceId > highestSequenceId) {
             cnx.ctx().channel().eventLoop().execute(() -> {
                 cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
@@ -152,7 +154,7 @@ public class Producer {
             return;
         }
         beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
-        publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize);
+        publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
     }
 
     public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
@@ -197,16 +199,16 @@ public class Producer {
         startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
     }
 
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) {
+    private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
-                        System.nanoTime()));
+                        isChunked, System.nanoTime()));
     }
 
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) {
+    private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked) {
         topic.publishMessage(headersAndPayload,
                 MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
-                        System.nanoTime()));
+                        isChunked, System.nanoTime()));
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -286,6 +288,8 @@ public class Producer {
         private Rate rateIn;
         private int msgSize;
         private long batchSize;
+        private boolean chunked;
+
         private long startTimeNs;
 
         private String originalProducerName;
@@ -302,6 +306,10 @@ public class Producer {
             return sequenceId;
         }
 
+        public boolean isChunked() {
+            return chunked;
+        }
+
         @Override
         public long getHighestSequenceId() {
             return highestSequenceId;
@@ -387,18 +395,22 @@ public class Producer {
                     Commands.newSendReceipt(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId),
                     producer.cnx.ctx().voidPromise());
             producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
+            if (this.chunked) {
+                producer.chuckedMessageRate.recordEvent();
+            }
             producer.publishOperationCompleted();
             recycle();
         }
 
         static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
-                long batchSize, long startTimeNs) {
+                long batchSize, boolean chunked, long startTimeNs) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = sequenceId;
             callback.rateIn = rateIn;
             callback.msgSize = msgSize;
             callback.batchSize = batchSize;
+            callback.chunked = chunked;
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
@@ -406,7 +418,7 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
-                 int msgSize, long batchSize, long startTimeNs) {
+                 int msgSize, long batchSize, boolean chunked, long startTimeNs) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = lowestSequenceId;
@@ -417,6 +429,7 @@ public class Producer {
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
+            callback.chunked = chunked;
             return callback;
         }
 
@@ -444,6 +457,11 @@ public class Producer {
             entryId = -1L;
             batchSize = 0L;
             startTimeNs = -1L;
+            ledgerId = -1;
+            entryId = -1;
+            batchSize = 0;
+            chunked = false;
+            startTimeNs = -1;
             recyclerHandle.recycle(this);
         }
     }
@@ -525,9 +543,14 @@ public class Producer {
 
     public void updateRates() {
         msgIn.calculateRate();
+        chuckedMessageRate.calculateRate();
         stats.msgRateIn = msgIn.getRate();
         stats.msgThroughputIn = msgIn.getValueRate();
         stats.averageMsgSize = msgIn.getAverageValue();
+        stats.chunkedMessageRate = chuckedMessageRate.getRate();
+        if (chuckedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) {
+            ((PersistentTopic) this.topic).msgChunkPublished = true;
+        }
         if (this.isNonPersistentTopic) {
             msgDrop.calculateRate();
             ((NonPersistentPublisherStats) stats).msgDropRate = msgDrop.getRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
index 3cc0ca2..a5df8dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
@@ -26,6 +26,7 @@ import lombok.Data;
 public class SendMessageInfo {
     private int totalMessages;
     private long totalBytes;
+    private int totalChunkedMessages;
 
     private SendMessageInfo() {
         // Private constructor so that all usages are through the thread-local instance
@@ -35,6 +36,7 @@ public class SendMessageInfo {
         SendMessageInfo smi = THREAD_LOCAL.get();
         smi.totalMessages = 0;
         smi.totalBytes = 0;
+        smi.totalChunkedMessages = 0;
         return smi;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 669b472..125d9bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1214,9 +1214,10 @@ public class ServerCnx extends PulsarHandler {
         // Persist the message
         if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
-                    headersAndPayload, send.getNumMessages());
+                    headersAndPayload, send.getNumMessages(), send.getIsChunk());
         } else {
-            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
+            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
+                    send.getNumMessages(), send.getIsChunk());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index d97e0db..2053f5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -203,7 +203,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
             consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
-                    sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
 
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
         } else {
@@ -234,4 +234,4 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
 
     private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 7db50cc..160d165 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -64,7 +64,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
             currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
-                    sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
         } else {
             entries.forEach(entry -> {
                 int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
@@ -154,4 +154,4 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     protected void cancelPendingRead() {
         // No-op
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 76ac571..b01a432 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -81,7 +81,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
                     EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
                     filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
                     consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+                            sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
                     TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
                 } else {
                     entries.forEach(entry -> {
@@ -95,4 +95,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7514103..1402b40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.logging.log4j.util.Strings;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
@@ -523,7 +524,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
-                        sendMessageInfo.getTotalBytes(), redeliveryTracker);
+                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
                 start += messagesForC;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4d856e5..263bd7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -246,7 +246,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
 
             currentConsumer
                     .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), redeliveryTracker)
+                            sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+                            redeliveryTracker)
                     .addListener(future -> {
                         if (future.isSuccess()) {
                             // acquire message-dispatch permits for already delivered messages
@@ -555,4 +556,4 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 22e45d5..6079d31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -116,11 +116,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
                 consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
-                        sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
+                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+                        getRedeliveryTracker()).addListener(future -> {
                             if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                                 readMoreEntries();
                             }
-                });
+                        });
 
                 for (int i = 0; i < messagesForC; i++) {
                     entriesWithSameKey.getValue().remove(0);
@@ -175,4 +176,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 041f511..c207832 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -997,6 +997,7 @@ public class PersistentSubscription implements Subscription {
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+                subStats.chuckedMessageRate += consumerStats.chuckedMessageRate;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
                 subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ad2c57e..3b6ea4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -153,6 +153,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
 
+    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+
+    // Timestamp of when this topic was last seen active
+    private volatile long lastActive;
+    
+    // topic has every published chunked message since topic is loaded
+    public boolean msgChunkPublished;
+
+    // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
+    // doesn't support batch-message
+    private volatile boolean hasBatchMessagePublished = false;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public volatile long delayedDeliveryTickTimeMillis = 1000;
@@ -1516,6 +1527,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
         stats.msgInCounter = getMsgInCounter();
         stats.bytesInCounter = getBytesInCounter();
+        stats.msgChunkPublished = this.msgChunkPublished;
 
         subscriptions.forEach((name, subscription) -> {
             SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 7d1935a..99822e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -103,6 +103,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
                 any(EntryBatchIndexesAcks.class),
                 anyInt(),
                 anyLong(),
+                anyLong(),
                 any(RedeliveryTracker.class)
         );
 
@@ -146,6 +147,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
                 any(EntryBatchIndexesAcks.class),
                 totalMessagesCaptor.capture(),
                 anyLong(),
+                anyLong(),
                 any(RedeliveryTracker.class)
         );
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
new file mode 100644
index 0000000..80dd173
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class MessageChunkingTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(MessageChunkingTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testInvalidConfig() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+        // batching and chunking can't be enabled together
+        try {
+            Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
+            fail("producer creation should have fail");
+        } catch (IllegalArgumentException ie) {
+            // Ok
+        }
+    }
+
+    @Test
+    public void testLargeMessage() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(5);
+        final int totalMessages = 5;
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        Producer<byte[]> producer = producerBuilder.compressionType(CompressionType.LZ4).enableChunking(true)
+                .enableBatching(false).create();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+        List<String> publishedMessages = Lists.newArrayList();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = createMessagePayload(i * 10);
+            publishedMessages.add(message);
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        List<MessageId> msgIds = Lists.newArrayList();
+        for (int i = 0; i < totalMessages; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("[{}] - Published [{}] Received message: [{}]", i, publishedMessages.get(i), receivedMessage);
+            String expectedMessage = publishedMessages.get(i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            msgIds.add(msg.getMessageId());
+        }
+
+        pulsar.getBrokerService().updateRates();
+
+        PublisherStats producerStats = topic.getStats(false).publishers.get(0);
+
+        assertTrue(producerStats.chunkedMessageRate > 0);
+
+        ManagedCursorImpl mcursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
+        PositionImpl readPosition = (PositionImpl) mcursor.getReadPosition();
+
+        for (MessageId msgId : msgIds) {
+            consumer.acknowledge(msgId);
+        }
+
+        retryStrategically((test) -> {
+            return mcursor.getMarkDeletedPosition().getNext().equals(readPosition);
+        }, 5, 200);
+
+        assertEquals(readPosition, mcursor.getMarkDeletedPosition().getNext());
+
+        assertEquals(readPosition.getEntryId(), ((ConsumerImpl) consumer).getAvailablePermits());
+
+        consumer.close();
+        producer.close();
+        log.info("-- Exiting {} test --", methodName);
+
+    }
+
+    @Test
+    public void testLargeMessageAckTimeOut() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(5);
+        final int totalMessages = 5;
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .ackTimeout(5, TimeUnit.SECONDS).subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false).create();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+        List<String> publishedMessages = Lists.newArrayList();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = createMessagePayload(i * 10);
+            publishedMessages.add(message);
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < totalMessages; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = publishedMessages.get(i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+
+        retryStrategically((test) -> consumer.getUnAckedMessageTracker().messageIdPartitionMap.isEmpty(), 10,
+                TimeUnit.SECONDS.toMillis(1));
+
+        msg = null;
+        messageSet.clear();
+        MessageId lastMsgId = null;
+        for (int i = 0; i < totalMessages; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            lastMsgId = msg.getMessageId();
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = publishedMessages.get(i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+
+        ManagedCursorImpl mcursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
+        PositionImpl readPosition = (PositionImpl) mcursor.getReadPosition();
+
+        consumer.acknowledgeCumulative(lastMsgId);
+
+        retryStrategically((test) -> {
+            return mcursor.getMarkDeletedPosition().getNext().equals(readPosition);
+        }, 5, 200);
+
+        assertEquals(readPosition, mcursor.getMarkDeletedPosition().getNext());
+
+        consumer.close();
+        producer.close();
+        log.info("-- Exiting {} test --", methodName);
+
+    }
+
+    @Test
+    public void testPublishWithFailure() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(5);
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false)
+                .create();
+
+        stopBroker();
+
+        try {
+            producer.send(createMessagePayload(100).getBytes());
+            fail("should have failed with timeout exception");
+        } catch (PulsarClientException.TimeoutException e) {
+            // Ok
+        }
+        producer.close();
+    }
+
+    @Test(enabled = false)
+    public void testMaxPendingChunkMessages() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(10);
+        final int totalMessages = 25;
+        final String topicName = "persistent://my-property/my-ns/maxPending";
+        final int totalProducers = 25;
+        ExecutorService executor = Executors.newFixedThreadPool(totalProducers);
+
+        try {
+            ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                    .maxPendingChuckedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true)
+                    .ackTimeout(5, TimeUnit.SECONDS).subscribe();
+
+            ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+            Producer<byte[]>[] producers = new Producer[totalProducers];
+            int totalPublishedMessages = totalProducers;
+            List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+            for (int i = 0; i < totalProducers; i++) {
+                producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
+                int index = i;
+                executor.submit(() -> {
+                    futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes()));
+                });
+            }
+
+            FutureUtil.waitForAll(futures).get();
+            PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+            Message<byte[]> msg = null;
+            Set<String> messageSet = Sets.newHashSet();
+            for (int i = 0; i < totalMessages; i++) {
+                msg = consumer.receive(1, TimeUnit.SECONDS);
+                if (msg == null) {
+                    break;
+                }
+                String receivedMessage = new String(msg.getData());
+                log.info("Received message: [{}]", receivedMessage);
+                messageSet.add(receivedMessage);
+                consumer.acknowledge(msg);
+            }
+
+            assertNotEquals(messageSet.size(), totalPublishedMessages);
+        } finally {
+            executor.shutdown();
+        }
+
+    }
+
+    /**
+     * Validate that chunking is not supported with batching and non-persistent topic
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testInvalidUseCaseForChunking() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(5);
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        try {
+            Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
+            fail("it should have failied because chunking can't be used with batching enabled");
+        } catch (IllegalArgumentException ie) {
+            // Ok
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testExpireIncompleteChunkMessage() throws Exception{
+        final String topicName = "persistent://prop/use/ns-abc/expireMsg";
+
+        // 1. producer connect
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .sendTimeout(10, TimeUnit.MINUTES)
+            .create();
+        Field producerIdField = ProducerImpl.class.getDeclaredField("producerId");
+        producerIdField.setAccessible(true);
+        long producerId = (long) producerIdField.get(producer);
+        producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-sub").subscribe();
+
+        TypedMessageBuilderImpl<byte[]> msg = (TypedMessageBuilderImpl<byte[]>) producer.newMessage().value("message-1".getBytes());
+        ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
+        Builder metadataBuilder = ((TypedMessageBuilderImpl<byte[]>) msg).getMetadataBuilder();
+        MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1).setPublishTime(10L)
+                .setUuid("123").setNumChunksFromMsg(2).setChunkId(0).setTotalChunkMsgSize(100).build();
+        ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
+        MessageImpl msgImpl = ((MessageImpl<byte[]>) msg.getMessage());
+        msgImpl.setSchemaState(SchemaState.Ready);
+        OpSendMsg op = OpSendMsg.create(msgImpl, cmd, 1, null);
+        producer.processOpSendMsg(op);
+
+        retryStrategically((test) -> {
+            return consumer.chunkedMessagesMap.size() > 0;
+        }, 5, 500);
+        assertEquals(consumer.chunkedMessagesMap.size(), 1);
+        
+        consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
+        Thread.sleep(10);
+        consumer.removeExpireIncompleteChunkedMessages();
+        assertEquals(consumer.chunkedMessagesMap.size(), 0);
+       
+        producer.close();
+        consumer.close();
+        producer = null; // clean reference of mocked producer
+    }
+    
+    private String createMessagePayload(int size) {
+        StringBuilder str = new StringBuilder();
+        Random rand = new Random();
+        for (int i = 0; i < size; i++) {
+            str.append(rand.nextInt(10));
+        }
+        return str.toString();
+    }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 461ba70..def1807 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -609,4 +609,49 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> enableRetry(boolean retryEnable);
 
+    /**
+     * Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While
+     * consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed
+     * with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different
+     * messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or
+     * publisher failed to publish all chunks of the messages.
+     *
+     * <pre>
+     * eg: M1-C1, M2-C1, M1-C2, M2-C2
+     * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
+     * </pre>
+     * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
+     * guarded by providing this @maxPendingChuckedMessage threshold. Once, consumer reaches this threshold, it drops
+     * the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked.
+     * This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull
+     *
+     * @default 100
+     *
+     * @param maxPendingChuckedMessage
+     * @return
+     */
+    ConsumerBuilder<T> maxPendingChuckedMessage(int maxPendingChuckedMessage);
+
+    /**
+     * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
+     * guarded by providing this @maxPendingChuckedMessage threshold. Once, consumer reaches this threshold, it drops
+     * the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it
+     * marks them for redelivery.
+     *
+     * @default false
+     *
+     * @param autoAckOldestChunkedMessageOnQueueFull
+     * @return
+     */
+    ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);
+
+    /**
+     * If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer
+     * won't be able to receive all chunks in expire times (default 1 hour).
+     *
+     * @param duration
+     * @param unit
+     * @return
+     */
+    ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index c4a250e..f94e6c2 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -293,6 +293,33 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> enableBatching(boolean enableBatching);
 
     /**
+     * If message size is higher than allowed max publish-payload size by broker then enableChunking helps producer to
+     * split message into multiple chunks and publish them to broker separately and in order. So, it allows client to
+     * successfully publish large size of messages in pulsar.
+     *
+     * <p>This feature allows publisher to publish large size of message by splitting it to multiple chunks and let
+     * consumer stitch them together to form a original large published message. Therefore, it's necessary to configure
+     * recommended configuration at pulsar producer and consumer. Recommendation to use this feature:
+     *
+     * <pre>
+     * 1. This feature is right now only supported by non-shared subscription and persistent-topic.
+     * 2. Disable batching to use chunking feature
+     * 3. Pulsar-client keeps published messages into buffer until it receives ack from broker.
+     * So, it's better to reduce "maxPendingMessages" size to avoid producer occupying large amount
+     *  of memory by buffered messages.
+     * 4. Set message-ttl on the namespace to cleanup incomplete chunked messages.
+     * (sometime due to broker-restart or publish time, producer might fail to publish entire large message
+     * so, consumer will not be able to consume and ack those messages. So, those messages can
+     * be only discared by msg ttl) Or configure
+     * {@link ConsumerBuilder#expireTimeOfIncompleteChunkedMessage()}
+     * 5. Consumer configuration: consumer should also configure receiverQueueSize and maxPendingChuckedMessage
+     * </pre>
+     * @param enableChunking
+     * @return
+     */
+    ProducerBuilder<T> enableChunking(boolean enableChunking);
+
+    /**
      * Sets a {@link CryptoKeyReader}.
      *
      * <p>Configure the key reader to be used to encrypt the message payloads.
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 673a45e..1812a96 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -101,7 +101,17 @@ public class CmdConsume {
 
     @Parameter(names = { "--regex" }, description = "Indicate the topic name is a regex pattern")
     private boolean isRegex = false;
+    
+    @Parameter(names = { "-q", "--queue-size" }, description = "Consumer receiver queue size.")
+    private int receiverQueueSize = 0;
 
+    @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
+    private int maxPendingChuckedMessage = 0;
+
+    @Parameter(names = { "-ac",
+            "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest message on queue is full")
+    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+    
     private ClientBuilder clientBuilder;
     private Authentication authentication;
     private String serviceURL;
@@ -195,7 +205,16 @@ public class CmdConsume {
                 builder.topic(topic);
             }
 
-            Consumer<byte[]> consumer = builder.subscribe();
+            ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer().topic(topic);
+            if (this.maxPendingChuckedMessage > 0) {
+                consumerBuilder.maxPendingChuckedMessage(this.maxPendingChuckedMessage);
+            }
+            if (this.receiverQueueSize > 0) {
+                consumerBuilder.maxPendingChuckedMessage(this.receiverQueueSize);
+            }
+            Consumer<byte[]> consumer = consumerBuilder.subscriptionName(this.subscriptionName)
+                    .autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull)
+                    .subscriptionType(subscriptionType).subscribe();
 
             RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
             while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index f143ef3..9b67a08 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -44,7 +44,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -94,6 +96,10 @@ public class CmdProduce {
                description = "Rate (in msg/sec) at which to produce," +
                        " value 0 means to produce messages as fast as possible.")
     private double publishRate = 0;
+    
+    @Parameter(names = { "-c",
+            "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
+    private boolean chunkingAllowed = false;
 
     @Parameter(names = { "-s", "--separator" },
                description = "Character to split messages string on default is comma")
@@ -197,7 +203,12 @@ public class CmdProduce {
 
         try {
             PulsarClient client = clientBuilder.build();
-            Producer<byte[]> producer = client.newProducer().topic(topic).create();
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
+            if (this.chunkingAllowed) {
+                producerBuilder.enableChunking(true);
+                producerBuilder.enableBatching(false);
+            }
+            Producer<byte[]> producer = producerBuilder.create();
 
             List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
             RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a0e6acd..3d45931 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
+    protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
     protected int maxReceiverQueueSize;
     protected final Schema<T> schema;
@@ -91,6 +93,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.consumerEventListener = conf.getConsumerEventListener();
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
+        this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
 
         this.listenerExecutor = listenerExecutor;
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a1ef384..9240ab3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -280,6 +280,18 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> maxPendingChuckedMessage(int maxPendingChuckedMessage) {
+        conf.setMaxPendingChuckedMessage(maxPendingChuckedMessage);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) {
+        conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
+        return this;
+    }
+    
+    @Override
     public ConsumerBuilder<T> property(String key, String value) {
         checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
                 "property key/value cannot be blank");
@@ -403,4 +415,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
+        conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
+        return null;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 662222f..d573d82 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,10 +25,14 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,6 +40,8 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -43,6 +49,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -98,6 +105,8 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -165,6 +174,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
     
     protected volatile boolean paused;
+    
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+    private int pendingChunckedMessageCount = 0;
+    protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
+    private boolean expireChunkMessageTaskScheduled = false;
+    private int maxPendingChuckedMessage;
+    // if queue size is reasonable (most of the time equal to number of producers try to publish messages concurrently on
+    // the topic) then it guards against broken chuncked message which was not fully published
+    private boolean autoAckOldestChunkedMessageOnQueueFull;
+    // it will be used to manage N outstanding chunked mesage buffers
+    private final BlockingQueue<String> pendingChunckedMessageUuidQueue;
 
     private final boolean createTopicIfDoesNotExist;
 
@@ -214,6 +234,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
         this.resetIncludeHead = conf.isResetIncludeHead();
         this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
+        this.maxPendingChuckedMessage = conf.getMaxPendingChuckedMessage();
+        this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
+        this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
+        this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -1012,6 +1036,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         final int numMessages = msgMetadata.getNumMessagesInBatch();
+        final boolean isChunkedMessage = msgMetadata.getNumChunksFromMsg() > 1 && conf.getSubscriptionType() != SubscriptionType.Shared;
 
         MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
         if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
@@ -1034,8 +1059,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         // uncompress decryptedPayload and release decryptedPayload-ByteBuf
-        ByteBuf uncompressedPayload = isMessageUndecryptable ? decryptedPayload.retain()
-                : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx);
+        ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
+                : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
         decryptedPayload.release();
         if (uncompressedPayload == null) {
             // Message was discarded on decompression error
@@ -1045,6 +1070,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
+            
+            // right now, chunked messages are only supported by non-shared subscription
+            if (isChunkedMessage) {
+                uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
+                if (uncompressedPayload == null) {
+                    msgMetadata.recycle();
+                    return;
+                }
+            }
 
             if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
                 // We need to discard entries that were prior to startMessageId
@@ -1094,6 +1128,88 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
+    private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
+            MessageIdData messageId, ClientCnx cnx) {
+
+        // Lazy task scheduling to expire incomplete chunk message
+        if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
+            ((ScheduledExecutorService) listenerExecutor).scheduleAtFixedRate(() -> {
+                removeExpireIncompleteChunkedMessages();
+            }, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
+                    TimeUnit.MILLISECONDS);
+            expireChunkMessageTaskScheduled = true;
+        }
+        
+        if (msgMetadata.getChunkId() == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
+                    msgMetadata.getTotalChunkMsgSize());
+            int totalChunks = msgMetadata.getNumChunksFromMsg();
+            chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
+                    (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
+            pendingChunckedMessageCount++;
+            if (maxPendingChuckedMessage > 0 && pendingChunckedMessageCount > maxPendingChuckedMessage) {
+                removeOldestPendingChunkedMessage();
+            }
+            pendingChunckedMessageUuidQueue.add(msgMetadata.getUuid());
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
+        // discard message if chunk is out-of-order
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
+                || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
+            // means we lost the first chunk: should never happen
+            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
+                    msgMetadata.getTotalChunkMsgSize());
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(msgMetadata.getUuid());
+            compressedPayload.release();
+            increaseAvailablePermits(cnx);
+            if (expireTimeOfIncompleteChunkedMessageMillis > 0
+                    && System.currentTimeMillis() > (msgMetadata.getPublishTime()
+                            + expireTimeOfIncompleteChunkedMessageMillis)) {
+                doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
+            } else {
+                trackMessage(msgId);
+            }
+            return null;
+        }
+
+        chunkedMsgCtx.chunkedMessageIds[msgMetadata.getChunkId()] = msgId;
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(compressedPayload);
+        chunkedMsgCtx.lastChunkedMessageId = msgMetadata.getChunkId();
+
+        // if final chunk is not received yet then release payload and return
+        if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) {
+            compressedPayload.release();
+            increaseAvailablePermits(cnx);
+            return null;
+        }
+
+        // last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
+                    msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId());
+        }
+        // remove buffer from the map, add chucked messageId to unack-message tracker, and reduce pending-chunked-message count
+        chunkedMessagesMap.remove(msgMetadata.getUuid());
+        unAckedChunckedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
+        pendingChunckedMessageCount--;
+        compressedPayload.release();
+        compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
+        compressedPayload.release();
+        return uncompressedPayload;
+    }
+    
     protected void triggerListener(int numMessages) {
         // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
         // thread while the message processing happens
@@ -1307,19 +1423,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     protected void trackMessage(Message<?> msg) {
         if (msg != null) {
-            MessageId messageId = msg.getMessageId();
-            if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
-                MessageIdImpl id = (MessageIdImpl)messageId;
-                if (id instanceof BatchMessageIdImpl) {
-                    // do not add each item in batch message into tracker
-                    id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
-                }
-                if (hasParentConsumer) {
-                    // we should no longer track this message, TopicsConsumer will take care from now onwards
-                    unAckedMessageTracker.remove(id);
-                } else {
-                    unAckedMessageTracker.add(id);
-                }
+            trackMessage(msg.getMessageId());
+        }
+    }
+
+    protected void trackMessage(MessageId messageId) {
+        if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
+            MessageIdImpl id = (MessageIdImpl)messageId;
+            if (id instanceof BatchMessageIdImpl) {
+                // do not add each item in batch message into tracker
+                id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
+            }
+            if (hasParentConsumer) {
+                //TODO: check parent consumer here
+                // we should no longer track this message, TopicsConsumer will take care from now onwards
+                unAckedMessageTracker.remove(id);
+            } else {
+                unAckedMessageTracker.add(id);
             }
         }
     }
@@ -1412,19 +1532,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
-            ClientCnx currentCnx) {
+            ClientCnx currentCnx, boolean checkMaxMessageSize) {
         CompressionType compressionType = msgMetadata.getCompression();
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
         int uncompressedSize = msgMetadata.getUncompressedSize();
         int payloadSize = payload.readableBytes();
-        if (payloadSize > ClientCnx.getMaxMessageSize()) {
+        if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
             // payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
             log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
-                      messageId);
+                    messageId);
             discardCorruptedMessage(messageId, currentCnx, ValidationError.UncompressedSizeCorruption);
             return null;
         }
-
         try {
             ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
             return uncompressedPayload;
@@ -2035,6 +2154,99 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return topicNameWithoutPartition;
     }
 
+    static class ChunkedMessageCtx {
+
+        protected int totalChunks = -1;
+        protected ByteBuf chunkedMsgBuffer;
+        protected int lastChunkedMessageId = -1;
+        protected MessageIdImpl[] chunkedMessageIds;
+        protected long receivedTime = 0;
+
+        static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
+            ChunkedMessageCtx ctx = RECYCLER.get();
+            ctx.totalChunks = numChunksFromMsg;
+            ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+            ctx.chunkedMessageIds = new MessageIdImpl[numChunksFromMsg];
+            ctx.receivedTime = System.currentTimeMillis();
+            return ctx;
+        }
+
+        private final Handle<ChunkedMessageCtx> recyclerHandle;
+
+        private ChunkedMessageCtx(Handle<ChunkedMessageCtx> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
+            protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+                return new ChunkedMessageCtx(handle);
+            }
+        };
+
+        public void recycle() {
+            this.totalChunks = -1;
+            this.chunkedMsgBuffer = null;
+            this.lastChunkedMessageId = -1;
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    private void removeOldestPendingChunkedMessage() {
+        ChunkedMessageCtx chunkedMsgCtx = null;
+        String firstPendingMsgUuid = null;
+        while (chunkedMsgCtx == null && !pendingChunckedMessageUuidQueue.isEmpty()) {
+            // remove oldest pending chunked-message group and free memory
+            firstPendingMsgUuid = pendingChunckedMessageUuidQueue.poll();
+            chunkedMsgCtx = StringUtils.isNotBlank(firstPendingMsgUuid) ? chunkedMessagesMap.get(firstPendingMsgUuid)
+                    : null;
+        }
+        removeChunkMessage(firstPendingMsgUuid, chunkedMsgCtx, this.autoAckOldestChunkedMessageOnQueueFull);
+    }
+
+    protected void removeExpireIncompleteChunkedMessages() {
+        if (expireTimeOfIncompleteChunkedMessageMillis <= 0) {
+            return;
+        }
+        ChunkedMessageCtx chunkedMsgCtx = null;
+        String messageUUID;
+        while ((messageUUID = pendingChunckedMessageUuidQueue.peek()) != null) {
+            chunkedMsgCtx = StringUtils.isNotBlank(messageUUID) ? chunkedMessagesMap.get(messageUUID) : null;
+            if (chunkedMsgCtx != null && System
+                    .currentTimeMillis() > (chunkedMsgCtx.receivedTime + expireTimeOfIncompleteChunkedMessageMillis)) {
+                pendingChunckedMessageUuidQueue.remove(messageUUID);
+                removeChunkMessage(messageUUID, chunkedMsgCtx, true);
+            } else {
+                return;
+            }
+        }
+    }
+
+    private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, boolean autoAck) {
+        if (chunkedMsgCtx == null) {
+            return;
+        }
+        // clean up pending chuncked-Message
+        chunkedMessagesMap.remove(msgUUID);
+        if (chunkedMsgCtx.chunkedMessageIds != null) {
+            for (MessageIdImpl msgId : chunkedMsgCtx.chunkedMessageIds) {
+                if (msgId == null) {
+                    continue;
+                }
+                if (autoAck) {
+                    log.info("Removing chunk message-id {}", msgId);
+                    doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
+                } else {
+                    trackMessage(msgId);
+                }
+            }
+        }
+        if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+            chunkedMsgCtx.chunkedMsgBuffer.release();
+        }
+        chunkedMsgCtx.recycle();
+        pendingChunckedMessageCount--;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ea5dc67..f3147ce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -618,7 +618,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     public void redeliverUnacknowledgedMessages() {
         lock.writeLock().lock();
         try {
-            consumers.values().stream().forEach(consumer -> consumer.redeliverUnacknowledgedMessages());
+            consumers.values().stream().forEach(consumer -> {
+                consumer.redeliverUnacknowledgedMessages();
+                consumer.unAckedChunckedMessageIdSequenceMap.clear();
+            });
             incomingMessages.clear();
             INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
             unAckedMessageTracker.clear();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index da1b738..bbdd786 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequnceMap;
 
 class NegativeAcksTracker {
 
@@ -62,6 +63,7 @@ class NegativeAcksTracker {
         long now = System.nanoTime();
         nackedMessages.forEach((msgId, timestamp) -> {
             if (timestamp < now) {
+                addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer);
                 messagesToRedeliver.add(msgId);
             }
         });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 927594c..937f005 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 
@@ -185,10 +186,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             return false;
         }
 
-        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, ackType, null,
-                properties);
-
-        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+        newAckCommand(consumer.consumerId, msgId, null, ackType, null, properties, cnx, true /* flush */);
         return true;
     }
 
@@ -226,9 +224,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, lastCumulativeAckSet,
-                    AckType.Cumulative, null, Collections.emptyMap());
-            cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+            newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx,
+                    false /* flush */);
             shouldFlush=true;
             cumulativeAckFlushRequired = false;
         }
@@ -244,7 +241,20 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                         break;
                     }
 
-                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
+                    // if messageId is checked then all the chunked related to that msg also processed so, ack all of
+                    // them
+                    MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
+                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+                        for (MessageIdImpl cMsgId : chunkMsgIds) {
+                            if (cMsgId != null) {
+                                entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
+                            }
+                        }
+                        // messages will be acked so, remove checked message sequence
+                        this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
+                    } else {
+                        entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
+                    }
                 }
             } else {
                 // When talking to older brokers, send the acknowledgements individually
@@ -254,8 +264,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                         break;
                     }
 
-                    cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null,
-                            AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
+                    newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(),
+                            cnx, false);
                     shouldFlush = true;
                 }
             }
@@ -300,4 +310,45 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             scheduledTask.cancel(true);
         }
     }
-}
+
+    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
+            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush) {
+
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
+        if (chunkMsgIds != null) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != AckType.Cumulative) {
+                List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
+                for (MessageIdImpl cMsgId : chunkMsgIds) {
+                    if (cMsgId != null && chunkMsgIds.length > 1) {
+                        entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
+                    }
+                }
+                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
+                if (flush) {
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                } else {
+                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                }
+            } else {
+                for (MessageIdImpl cMsgId : chunkMsgIds) {
+                    ByteBuf cmd = Commands.newAck(consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(),
+                            lastCumulativeAckSet, ackType, validationError, map);
+                    if (flush) {
+                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    } else {
+                        cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    }
+                }
+            }
+            this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
+        } else {
+            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet,
+                    ackType, validationError, map);
+            if (flush) {
+                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+            } else {
+                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 12a52e3..9aaec55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -96,6 +96,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
 
     @Override
     public CompletableFuture<Producer<T>> createAsync() {
+        // config validation
+        checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
+                "Batching and chunking of messages can't be enabled together");
         if (conf.getTopicName() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
@@ -187,6 +190,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> enableChunking(boolean chunkingEnabled) {
+        conf.setChunkingEnabled(chunkingEnabled);
+        return this;
+    }
+
+    @Override
     public ProducerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
         conf.setCryptoKeyReader(cryptoKeyReader);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c249d8f..8f6f59c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -39,12 +39,15 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.ScheduledFuture;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -66,9 +69,11 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
@@ -365,7 +370,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
             // validate msg-size (For batching this will be check at the batch completion size)
             int compressedSize = compressedPayload.readableBytes();
-            if (compressedSize > ClientCnx.getMaxMessageSize()) {
+            if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
                 compressedPayload.release();
                 String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
                                            ? "Compressed"
@@ -392,95 +397,143 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return;
         }
 
+        // send in chunks
+        int totalChunks = canAddToBatch(msg) ? 1
+                : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+                        + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
+        // chunked message also sent individually so, try to acquire send-permits
+        for (int i = 0; i < (totalChunks - 1); i++) {
+            if (!canEnqueueRequest(callback, message.getSequenceId())) {
+                return;
+            }
+        }
+        
         try {
             synchronized (this) {
-                long sequenceId;
-                if (!msgMetadataBuilder.hasSequenceId()) {
-                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-                    msgMetadataBuilder.setSequenceId(sequenceId);
-                } else {
-                    sequenceId = msgMetadataBuilder.getSequenceId();
+                int readStartIndex = 0;
+                String uuid = UUID.randomUUID().toString();
+                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    serializeAndSendMessage(msg, msgMetadataBuilder, payload, uuid, chunkId, totalChunks,
+                            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
+                            compressedPayload.readableBytes(), uncompressedSize, callback);
+                    readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
                 }
-                if (!msgMetadataBuilder.hasPublishTime()) {
-                    msgMetadataBuilder.setPublishTime(client.getClientClock().millis());
+            }
+        } catch (PulsarClientException e) {
+            e.setSequenceId(msg.getSequenceId());
+            completeCallbackAndReleaseSemaphore(callback, e);
+        } catch (Throwable t) {
+            completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId()));
+        }
+    }
 
-                    checkArgument(!msgMetadataBuilder.hasProducerName());
+    private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuilder, ByteBuf payload,
+            String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
+            int compressedPayloadSize,
+            int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
+        ByteBuf chunkPayload = compressedPayload;
+        Builder chunkMsgMetadataBuilder = msgMetadataBuilder;
+        if (totalChunks > 1 && TopicName.get(topic).isPersistent()) {
+            chunkPayload = compressedPayload.slice(readStartIndex,
+                    Math.min(chunkMaxSizeInBytes, chunkPayload.readableBytes() - readStartIndex));
+            // don't retain last chunk payload and builder as it will be not needed for next chunk-iteration and it will
+            // be released once this chunk-message is sent
+            if (chunkId != totalChunks - 1) {
+                chunkPayload.retain();
+                chunkMsgMetadataBuilder = msgMetadataBuilder.clone();
+            }
+            chunkMsgMetadataBuilder.setUuid(uuid);
+            chunkMsgMetadataBuilder.setChunkId(chunkId);
+            chunkMsgMetadataBuilder.setNumChunksFromMsg(totalChunks);
+            chunkMsgMetadataBuilder.setTotalChunkMsgSize(compressedPayloadSize);
+        }
+        long sequenceId;
+        if (!chunkMsgMetadataBuilder.hasSequenceId()) {
+            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+            chunkMsgMetadataBuilder.setSequenceId(sequenceId);
+        } else {
+            sequenceId = chunkMsgMetadataBuilder.getSequenceId();
+        }
+        if (!chunkMsgMetadataBuilder.hasPublishTime()) {
+            chunkMsgMetadataBuilder.setPublishTime(client.getClientClock().millis());
 
-                    msgMetadataBuilder.setProducerName(producerName);
+            checkArgument(!chunkMsgMetadataBuilder.hasProducerName());
 
-                    if (conf.getCompressionType() != CompressionType.NONE) {
-                        msgMetadataBuilder.setCompression(
-                                CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
-                    }
-                    msgMetadataBuilder.setUncompressedSize(uncompressedSize);
-                }
-                if (canAddToBatch(msg)) {
-                    if (canAddToCurrentBatch(msg)) {
-                        // should trigger complete the batch message, new message will add to a new batch and new batch
-                        // sequence id use the new message, so that broker can handle the message duplication
-                        if (sequenceId <= lastSequenceIdPushed) {
-                            isLastSequenceIdPotentialDuplicated = true;
-                            if (sequenceId <= lastSequenceIdPublished) {
-                                log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
-                            } else {
-                                log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
-                                    sequenceId);
-                            }
-                            doBatchSendAndAdd(msg, callback, payload);
-                        } else {
-                            // Should flush the last potential duplicated since can't combine potential duplicated messages
-                            // and non-duplicated messages into a batch.
-                            if (isLastSequenceIdPotentialDuplicated) {
-                                doBatchSendAndAdd(msg, callback, payload);
-                            } else {
-                                // handle boundary cases where message being added would exceed
-                                // batch size and/or max message size
-                                boolean isBatchFull = batchMessageContainer.add(msg, callback);
-                                lastSendFuture = callback.getFuture();
-                                payload.release();
-                                if (isBatchFull) {
-                                    batchMessageAndSend();
-                                }
-                            }
-                            isLastSequenceIdPotentialDuplicated = false;
-                        }
+            chunkMsgMetadataBuilder.setProducerName(producerName);
+
+            if (conf.getCompressionType() != CompressionType.NONE) {
+                chunkMsgMetadataBuilder
+                        .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+            }
+            chunkMsgMetadataBuilder.setUncompressedSize(uncompressedSize);
+        }
+
+        if (canAddToBatch(msg) && totalChunks <= 1) {
+            if (canAddToCurrentBatch(msg)) {
+                // should trigger complete the batch message, new message will add to a new batch and new batch
+                // sequence id use the new message, so that broker can handle the message duplication
+                if (sequenceId <= lastSequenceIdPushed) {
+                    isLastSequenceIdPotentialDuplicated = true;
+                    if (sequenceId <= lastSequenceIdPublished) {
+                        log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
                     } else {
-                        doBatchSendAndAdd(msg, callback, payload);
+                        log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
+                            sequenceId);
                     }
+                    doBatchSendAndAdd(msg, callback, payload);
                 } else {
-                    ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);
-                    // When publishing during replication, we need to set the correct number of message in batch
-                    // This is only used in tracking the publish rate stats
-                    int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
-                            ? msg.getMessageBuilder().getNumMessagesInBatch()
-                            : 1;
-                    final OpSendMsg op;
-                    if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
-                        MessageMetadata msgMetadata = msgMetadataBuilder.build();
-                        ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
-                        op = OpSendMsg.create(msg, cmd, sequenceId, callback);
-                        msgMetadataBuilder.recycle();
-                        msgMetadata.recycle();
+                    // Should flush the last potential duplicated since can't combine potential duplicated messages
+                    // and non-duplicated messages into a batch.
+                    if (isLastSequenceIdPotentialDuplicated) {
+                        doBatchSendAndAdd(msg, callback, payload);
                     } else {
-                        op = OpSendMsg.create(msg, null, sequenceId, callback);
-                        op.rePopulate = () -> {
-                            MessageMetadata msgMetadata = msgMetadataBuilder.build();
-                            op.cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
-                            msgMetadataBuilder.recycle();
-                            msgMetadata.recycle();
-                        };
+                        // handle boundary cases where message being added would exceed
+                        // batch size and/or max message size
+                        boolean isBatchFull = batchMessageContainer.add(msg, callback);
+                        lastSendFuture = callback.getFuture();
+                        payload.release();
+                        if (isBatchFull) {
+                            batchMessageAndSend();
+                        }
                     }
-                    op.setNumMessagesInBatch(numMessages);
-                    op.setBatchSizeByte(encryptedPayload.readableBytes());
-                    lastSendFuture = callback.getFuture();
-                    processOpSendMsg(op);
+                    isLastSequenceIdPotentialDuplicated = false;
                 }
+            } else {
+                doBatchSendAndAdd(msg, callback, payload);
             }
-        } catch (PulsarClientException e) {
-            e.setSequenceId(msg.getSequenceId());
-            completeCallbackAndReleaseSemaphore(callback, e);
-        } catch (Throwable t) {
-            completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId()));
+        } else {
+            ByteBuf encryptedPayload = encryptMessage(chunkMsgMetadataBuilder, chunkPayload);
+
+            MessageMetadata msgMetadata = chunkMsgMetadataBuilder.build();
+            // When publishing during replication, we need to set the correct number of message in batch
+            // This is only used in tracking the publish rate stats
+            int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
+                    ? msg.getMessageBuilder().getNumMessagesInBatch()
+                    : 1;
+            final OpSendMsg op;
+            if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
+                ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
+                op = OpSendMsg.create(msg, cmd, sequenceId, callback);
+                chunkMsgMetadataBuilder.recycle();
+                msgMetadata.recycle();
+            } else {
+                op = OpSendMsg.create(msg, null, sequenceId, callback);
+                final Builder tmpBuilder = chunkMsgMetadataBuilder;
+                op.rePopulate = () -> {
+                    MessageMetadata metadata = msgMetadataBuilder.build();
+                    op.cmd = sendMessage(producerId, sequenceId, numMessages, metadata, encryptedPayload);
+                    tmpBuilder.recycle();
+                    msgMetadata.recycle();
+                };
+            }
+            op.setNumMessagesInBatch(numMessages);
+            op.setBatchSizeByte(encryptedPayload.readableBytes());
+            if (totalChunks > 1) {
+                op.totalChunks = totalChunks;
+                op.chunkId = chunkId;
+            }
+            lastSendFuture = callback.getFuture();
+            processOpSendMsg(op);
         }
     }
 
@@ -876,9 +929,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         last -> Math.max(last, getHighestSequenceId(finalOp)));
                 op.setMessageId(ledgerId, entryId, partitionIndex);
                 try {
-                    // Need to protect ourselves from any exception being thrown in the future handler from the
-                    // application
-                    op.callback.sendComplete(null);
+                    // if message is chunked then call callback only on last chunk
+                    if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
+                        try {
+
+                            // Need to protect ourselves from any exception being thrown in the future handler from the
+                            // application
+                            op.callback.sendComplete(null);
+                        } catch (Throwable t) {
+                            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
+                                    producerName, sequenceId, t);
+                        }
+                    }
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
                             sequenceId, t);
@@ -1010,6 +1072,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         long batchSizeByte = 0;
         int numMessagesInBatch = 1;
         long highestSequenceId;
+        int totalChunks = 0;
+        int chunkId = -1;
 
         static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
             OpSendMsg op = RECYCLER.get();
@@ -1052,6 +1116,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             sequenceId = -1L;
             createdAt = -1L;
             highestSequenceId = -1L;
+            totalChunks = 0;
+            chunkId = -1;
             recyclerHandle.recycle(this);
         }
 
@@ -1396,7 +1462,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     // Need to protect ourselves from any exception being thrown in the future handler from the
                     // application
                     ex.setSequenceId(op.sequenceId);
-                    op.callback.sendComplete(ex);
+                    // if message is chunked then call callback only on last chunk
+                    if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
+                        // Need to protect ourselves from any exception being thrown in the future handler from the
+                        // application
+                        op.callback.sendComplete(ex);
+                    }
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
                             op.sequenceId, t);
@@ -1511,7 +1582,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private void processOpSendMsg(OpSendMsg op) {
+    protected void processOpSendMsg(OpSendMsg op) {
         if (op == null) {
             return;
         }
@@ -1672,4 +1743,4 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 3e85d48..dfce149 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -131,6 +131,7 @@ public class UnAckedMessageTracker implements Closeable {
                     if (!headPartition.isEmpty()) {
                         log.warn("[{}] {} messages have timed-out", consumerBase, headPartition.size());
                         headPartition.forEach(messageId -> {
+                            addChunkedMessageIdsAndRemoveFromSequnceMap(messageId, messageIds, consumerBase);
                             messageIds.add(messageId);
                             messageIdPartitionMap.remove(messageId);
                         });
@@ -150,6 +151,19 @@ public class UnAckedMessageTracker implements Closeable {
         }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
     }
 
+    public static void addChunkedMessageIdsAndRemoveFromSequnceMap(MessageId messageId, Set<MessageId> messageIds,
+            ConsumerBase<?> consumerBase) {
+        if (messageId instanceof MessageIdImpl) {
+            MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunckedMessageIdSequenceMap.get((MessageIdImpl) messageId);
+            if (chunkedMsgIds != null && chunkedMsgIds.length > 0) {
+                for (MessageIdImpl msgId : chunkedMsgIds) {
+                    messageIds.add(msgId);
+                }
+            }
+            consumerBase.unAckedChunckedMessageIdSequenceMap.remove((MessageIdImpl) messageId);
+        }
+    }
+
     public void clear() {
         writeLock.lock();
         try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 1314327..eb82703 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -84,6 +84,13 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
     private long tickDurationMillis = 1000;
 
     private int priorityLevel = 0;
+    
+    // max pending chunked message to avoid sitting incomplete message into the queue and memory
+    private int maxPendingChuckedMessage = 10;
+    
+    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+    private long expireTimeOfIncompleteChunkedMessageMillis = 60 * 1000;
 
     @JsonIgnore
     private CryptoKeyReader cryptoKeyReader = null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index b6a0763..fd2f678 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -77,6 +77,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     private boolean batchingEnabled = true; // enabled by default
     @JsonIgnore
     private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
+    private boolean chunkingEnabled = false;
 
     @JsonIgnore
     private CryptoKeyReader cryptoKeyReader;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index fc6df91..eac424e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ public class AcknowledgementsGroupingTrackerTest {
     public void setup() {
         eventLoopGroup = new NioEventLoopGroup(1);
         consumer = mock(ConsumerImpl.class);
+        consumer.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
         cnx = mock(ClientCnx.class);
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
         when(cnx.ctx()).thenReturn(ctx);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index d3b9f3c..9691cc9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3702,6 +3702,22 @@ public final class PulsarApi {
     // optional bool null_value = 25 [default = false];
     boolean hasNullValue();
     boolean getNullValue();
+    
+    // optional string uuid = 26;
+    boolean hasUuid();
+    String getUuid();
+    
+    // optional int32 num_chunks_from_msg = 27;
+    boolean hasNumChunksFromMsg();
+    int getNumChunksFromMsg();
+    
+    // optional int32 total_chunk_msg_size = 28;
+    boolean hasTotalChunkMsgSize();
+    int getTotalChunkMsgSize();
+    
+    // optional int32 chunk_id = 29;
+    boolean hasChunkId();
+    int getChunkId();
   }
   public static final class MessageMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4082,6 +4098,68 @@ public final class PulsarApi {
       return nullValue_;
     }
     
+    // optional string uuid = 26;
+    public static final int UUID_FIELD_NUMBER = 26;
+    private java.lang.Object uuid_;
+    public boolean hasUuid() {
+      return ((bitField0_ & 0x00100000) == 0x00100000);
+    }
+    public String getUuid() {
+      java.lang.Object ref = uuid_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          uuid_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getUuidBytes() {
+      java.lang.Object ref = uuid_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        uuid_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
+    // optional int32 num_chunks_from_msg = 27;
+    public static final int NUM_CHUNKS_FROM_MSG_FIELD_NUMBER = 27;
+    private int numChunksFromMsg_;
+    public boolean hasNumChunksFromMsg() {
+      return ((bitField0_ & 0x00200000) == 0x00200000);
+    }
+    public int getNumChunksFromMsg() {
+      return numChunksFromMsg_;
+    }
+    
+    // optional int32 total_chunk_msg_size = 28;
+    public static final int TOTAL_CHUNK_MSG_SIZE_FIELD_NUMBER = 28;
+    private int totalChunkMsgSize_;
+    public boolean hasTotalChunkMsgSize() {
+      return ((bitField0_ & 0x00400000) == 0x00400000);
+    }
+    public int getTotalChunkMsgSize() {
+      return totalChunkMsgSize_;
+    }
+    
+    // optional int32 chunk_id = 29;
+    public static final int CHUNK_ID_FIELD_NUMBER = 29;
+    private int chunkId_;
+    public boolean hasChunkId() {
+      return ((bitField0_ & 0x00800000) == 0x00800000);
+    }
+    public int getChunkId() {
+      return chunkId_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -4106,6 +4184,10 @@ public final class PulsarApi {
       txnidMostBits_ = 0L;
       highestSequenceId_ = 0L;
       nullValue_ = false;
+      uuid_ = "";
+      numChunksFromMsg_ = 0;
+      totalChunkMsgSize_ = 0;
+      chunkId_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4217,6 +4299,18 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00080000) == 0x00080000)) {
         output.writeBool(25, nullValue_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        output.writeBytes(26, getUuidBytes());
+      }
+      if (((bitField0_ & 0x00200000) == 0x00200000)) {
+        output.writeInt32(27, numChunksFromMsg_);
+      }
+      if (((bitField0_ & 0x00400000) == 0x00400000)) {
+        output.writeInt32(28, totalChunkMsgSize_);
+      }
+      if (((bitField0_ & 0x00800000) == 0x00800000)) {
+        output.writeInt32(29, chunkId_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4322,6 +4416,22 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(25, nullValue_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(26, getUuidBytes());
+      }
+      if (((bitField0_ & 0x00200000) == 0x00200000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(27, numChunksFromMsg_);
+      }
+      if (((bitField0_ & 0x00400000) == 0x00400000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(28, totalChunkMsgSize_);
+      }
+      if (((bitField0_ & 0x00800000) == 0x00800000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(29, chunkId_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4481,6 +4591,14 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00200000);
         nullValue_ = false;
         bitField0_ = (bitField0_ & ~0x00400000);
+        uuid_ = "";
+        bitField0_ = (bitField0_ & ~0x00800000);
+        numChunksFromMsg_ = 0;
+        bitField0_ = (bitField0_ & ~0x01000000);
+        totalChunkMsgSize_ = 0;
+        bitField0_ = (bitField0_ & ~0x02000000);
+        chunkId_ = 0;
+        bitField0_ = (bitField0_ & ~0x04000000);
         return this;
       }
       
@@ -4610,6 +4728,22 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00080000;
         }
         result.nullValue_ = nullValue_;
+        if (((from_bitField0_ & 0x00800000) == 0x00800000)) {
+          to_bitField0_ |= 0x00100000;
+        }
+        result.uuid_ = uuid_;
+        if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+          to_bitField0_ |= 0x00200000;
+        }
+        result.numChunksFromMsg_ = numChunksFromMsg_;
+        if (((from_bitField0_ & 0x02000000) == 0x02000000)) {
+          to_bitField0_ |= 0x00400000;
+        }
+        result.totalChunkMsgSize_ = totalChunkMsgSize_;
+        if (((from_bitField0_ & 0x04000000) == 0x04000000)) {
+          to_bitField0_ |= 0x00800000;
+        }
+        result.chunkId_ = chunkId_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4706,6 +4840,18 @@ public final class PulsarApi {
         if (other.hasNullValue()) {
           setNullValue(other.getNullValue());
         }
+        if (other.hasUuid()) {
+          setUuid(other.getUuid());
+        }
+        if (other.hasNumChunksFromMsg()) {
+          setNumChunksFromMsg(other.getNumChunksFromMsg());
+        }
+        if (other.hasTotalChunkMsgSize()) {
+          setTotalChunkMsgSize(other.getTotalChunkMsgSize());
+        }
+        if (other.hasChunkId()) {
+          setChunkId(other.getChunkId());
+        }
         return this;
       }
       
@@ -4880,6 +5026,26 @@ public final class PulsarApi {
               nullValue_ = input.readBool();
               break;
             }
+            case 210: {
+              bitField0_ |= 0x00800000;
+              uuid_ = input.readBytes();
+              break;
+            }
+            case 216: {
+              bitField0_ |= 0x01000000;
+              numChunksFromMsg_ = input.readInt32();
+              break;
+            }
+            case 224: {
+              bitField0_ |= 0x02000000;
+              totalChunkMsgSize_ = input.readInt32();
+              break;
+            }
+            case 232: {
+              bitField0_ |= 0x04000000;
+              chunkId_ = input.readInt32();
+              break;
+            }
           }
         }
       }
@@ -5612,6 +5778,105 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional string uuid = 26;
+      private java.lang.Object uuid_ = "";
+      public boolean hasUuid() {
+        return ((bitField0_ & 0x00800000) == 0x00800000);
+      }
+      public String getUuid() {
+        java.lang.Object ref = uuid_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          uuid_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setUuid(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00800000;
+        uuid_ = value;
+        
+        return this;
+      }
+      public Builder clearUuid() {
+        bitField0_ = (bitField0_ & ~0x00800000);
+        uuid_ = getDefaultInstance().getUuid();
+        
+        return this;
+      }
+      void setUuid(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00800000;
+        uuid_ = value;
+        
+      }
+      
+      // optional int32 num_chunks_from_msg = 27;
+      private int numChunksFromMsg_ ;
+      public boolean hasNumChunksFromMsg() {
+        return ((bitField0_ & 0x01000000) == 0x01000000);
+      }
+      public int getNumChunksFromMsg() {
+        return numChunksFromMsg_;
+      }
+      public Builder setNumChunksFromMsg(int value) {
+        bitField0_ |= 0x01000000;
+        numChunksFromMsg_ = value;
+        
+        return this;
+      }
+      public Builder clearNumChunksFromMsg() {
+        bitField0_ = (bitField0_ & ~0x01000000);
+        numChunksFromMsg_ = 0;
+        
+        return this;
+      }
+      
+      // optional int32 total_chunk_msg_size = 28;
+      private int totalChunkMsgSize_ ;
+      public boolean hasTotalChunkMsgSize() {
+        return ((bitField0_ & 0x02000000) == 0x02000000);
+      }
+      public int getTotalChunkMsgSize() {
+        return totalChunkMsgSize_;
+      }
+      public Builder setTotalChunkMsgSize(int value) {
+        bitField0_ |= 0x02000000;
+        totalChunkMsgSize_ = value;
+        
+        return this;
+      }
+      public Builder clearTotalChunkMsgSize() {
+        bitField0_ = (bitField0_ & ~0x02000000);
+        totalChunkMsgSize_ = 0;
+        
+        return this;
+      }
+      
+      // optional int32 chunk_id = 29;
+      private int chunkId_ ;
+      public boolean hasChunkId() {
+        return ((bitField0_ & 0x04000000) == 0x04000000);
+      }
+      public int getChunkId() {
+        return chunkId_;
+      }
+      public Builder setChunkId(int value) {
+        bitField0_ |= 0x04000000;
+        chunkId_ = value;
+        
+        return this;
+      }
+      public Builder clearChunkId() {
+        bitField0_ = (bitField0_ & ~0x04000000);
+        chunkId_ = 0;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
@@ -16125,6 +16390,10 @@ public final class PulsarApi {
     // optional uint64 highest_sequence_id = 6 [default = 0];
     boolean hasHighestSequenceId();
     long getHighestSequenceId();
+    
+    // optional bool is_chunk = 7 [default = false];
+    boolean hasIsChunk();
+    boolean getIsChunk();
   }
   public static final class CommandSend extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -16221,6 +16490,16 @@ public final class PulsarApi {
       return highestSequenceId_;
     }
     
+    // optional bool is_chunk = 7 [default = false];
+    public static final int IS_CHUNK_FIELD_NUMBER = 7;
+    private boolean isChunk_;
+    public boolean hasIsChunk() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public boolean getIsChunk() {
+      return isChunk_;
+    }
+    
     private void initFields() {
       producerId_ = 0L;
       sequenceId_ = 0L;
@@ -16228,6 +16507,7 @@ public final class PulsarApi {
       txnidLeastBits_ = 0L;
       txnidMostBits_ = 0L;
       highestSequenceId_ = 0L;
+      isChunk_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16272,6 +16552,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeUInt64(6, highestSequenceId_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBool(7, isChunk_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -16304,6 +16587,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(6, highestSequenceId_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(7, isChunk_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -16429,6 +16716,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000010);
         highestSequenceId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000020);
+        isChunk_ = false;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -16486,6 +16775,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000020;
         }
         result.highestSequenceId_ = highestSequenceId_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.isChunk_ = isChunk_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -16510,6 +16803,9 @@ public final class PulsarApi {
         if (other.hasHighestSequenceId()) {
           setHighestSequenceId(other.getHighestSequenceId());
         }
+        if (other.hasIsChunk()) {
+          setIsChunk(other.getIsChunk());
+        }
         return this;
       }
       
@@ -16577,6 +16873,11 @@ public final class PulsarApi {
               highestSequenceId_ = input.readUInt64();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              isChunk_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -16709,6 +17010,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool is_chunk = 7 [default = false];
+      private boolean isChunk_ ;
+      public boolean hasIsChunk() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public boolean getIsChunk() {
+        return isChunk_;
+      }
+      public Builder setIsChunk(boolean value) {
+        bitField0_ |= 0x00000040;
+        isChunk_ = value;
+        
+        return this;
+      }
+      public Builder clearIsChunk() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        isChunk_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSend)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 460c50a..357407a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -41,6 +41,9 @@ public class ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
+    /** Total chunked messages dispatched. */
+    public double chuckedMessageRate;
+
     /** Name of the consumer. */
     public String consumerName;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
index 0d661dd..f1602a4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
@@ -36,4 +36,4 @@ public class NonPersistentPublisherStats extends PublisherStats {
         this.msgDropRate += stats.msgDropRate;
         return this;
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
index cec2257..c500033 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
@@ -37,6 +37,9 @@ public class PublisherStats {
     /** Average message size published by this publisher. */
     public double averageMsgSize;
 
+    /** total chunked message count received. **/
+    public double chunkedMessageRate;
+
     /** Id of this publisher. */
     public long producerId;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 6994632..18002b5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -43,6 +43,9 @@ public class SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
+    /** Chunked message dispatch rate. */
+    public int chuckedMessageRate;
+
     /** Number of messages in the subscription backlog. */
     public long msgBacklog;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index be471e3..8d560a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -58,6 +58,9 @@ public class TopicStats {
     /** Average size of published messages (bytes). */
     public double averageMsgSize;
 
+    /** Topic has chunked message published on it. */
+    public boolean msgChunkPublished;
+
     /** Space used to store the messages for the topic (bytes). */
     public long storageSize;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index cbc4a39..d5e772a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -488,6 +488,9 @@ public class Commands {
         if (txnIdMostBits > 0) {
             sendBuilder.setTxnidMostBits(txnIdMostBits);
         }
+        if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
+            sendBuilder.setIsChunk(true);
+        }
         CommandSend send = sendBuilder.build();
 
         ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
@@ -513,6 +516,9 @@ public class Commands {
         if (txnIdMostBits > 0) {
             sendBuilder.setTxnidMostBits(txnIdMostBits);
         }
+        if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
+            sendBuilder.setIsChunk(true);
+        }
         CommandSend send = sendBuilder.build();
 
         ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index a0aa8a7..cd7b760 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -139,6 +139,10 @@ message MessageMetadata {
 
     // Indicate if the message payload value is set
     optional bool null_value = 25 [ default = false ];
+    optional string uuid = 26;
+	optional int32 num_chunks_from_msg = 27;
+	optional int32 total_chunk_msg_size = 28;
+	optional int32 chunk_id = 29;
 }
 
 message SingleMessageMetadata {
@@ -439,6 +443,7 @@ message CommandSend {
 
     /// Add highest sequence id to support batch message with external sequence id
     optional uint64 highest_sequence_id = 6 [default = 0];
+    optional bool is_chunk     =7 [default = false];
 }
 
 message CommandSendReceipt {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index b667eff..61b027d 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -119,6 +119,17 @@ public class PerformanceConsumer {
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
         public String authPluginClassName;
 
+        @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
+        private int maxPendingChuckedMessage = 0;
+
+        @Parameter(names = { "-ac",
+                "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest message on queue is full")
+        private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+        @Parameter(names = { "-e",
+                "--expire_time_incomplete_chunked_messages" }, description = "Expire time in ms for incomplete chunk messages")
+        private long expireTimeOfIncompleteChunkedMessageMs = 0;
+
         @Parameter(
             names = { "--auth-params" },
             description = "Authentication parameters, whose format is determined by the implementation " +
@@ -274,7 +285,15 @@ public class PerformanceConsumer {
                 .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
                 .subscriptionType(arguments.subscriptionType)
                 .subscriptionInitialPosition(arguments.subscriptionInitialPosition)
+                .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
                 .replicateSubscriptionState(arguments.replicatedSubscription);
+        if (arguments.maxPendingChuckedMessage > 0) {
+            consumerBuilder.maxPendingChuckedMessage(arguments.maxPendingChuckedMessage);
+        }
+        if (arguments.expireTimeOfIncompleteChunkedMessageMs > 0) {
+            consumerBuilder.expireTimeOfIncompleteChunkedMessage(arguments.expireTimeOfIncompleteChunkedMessageMs,
+                    TimeUnit.MILLISECONDS);
+        }
 
         if (arguments.encKeyName != null) {
             byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
@@ -371,4 +390,4 @@ public class PerformanceConsumer {
     }
 
     private static final Logger log = LoggerFactory.getLogger(PerformanceConsumer.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 0bd40c3..9e9d0e8 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -122,6 +122,10 @@ public class PerformanceProducer {
         @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
         public String authPluginClassName;
 
+        @Parameter(names = { "-ch",
+                "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
+        private boolean chunkingAllowed = false;
+
         @Parameter(
             names = { "--auth-params" },
             description = "Authentication parameters, whose format is determined by the implementation " +
@@ -449,7 +453,12 @@ public class PerformanceProducer {
                 log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);
 
                 for (int j = 0; j < arguments.numProducers; j++) {
-                    futures.add(producerBuilder.clone().topic(topic).createAsync());
+                    ProducerBuilder<byte[]> prodBuilder = producerBuilder.clone().topic(topic);
+                    if (arguments.chunkingAllowed) {
+                        prodBuilder.enableChunking(true);
+                        prodBuilder.enableBatching(false);
+                    }
+                    futures.add(prodBuilder.createAsync());
                 }
             }
 
@@ -578,4 +587,4 @@ public class PerformanceProducer {
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
-}
+}
\ No newline at end of file