You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/10/25 01:22:49 UTC

[pulsar-client-go] branch master updated: [Issue 456] Support chunking for big messages. (#805)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 77c7ccb  [Issue 456] Support chunking for big messages. (#805)
77c7ccb is described below

commit 77c7ccbd144b00d17c320c5f67cadaedb53f6b1e
Author: Jiaqi Shen <18...@163.com>
AuthorDate: Tue Oct 25 09:22:43 2022 +0800

    [Issue 456] Support chunking for big messages. (#805)
    
    Master Issue: [#456](https://github.com/apache/pulsar-client-go/issues/456)
    
    ### Motivation
    
    Make pulsar go client support chunking to produce/consume big messages. The earlier implementation ([#717](https://github.com/apache/pulsar-client-go/pull/717)) didn't take into account many details, so I decided to reimplement it.
    
    ### Modifications
    
    - Add `internalSingleSend` to send message without batch because batch message will not be received by chunk.
    - Moved `BlockIfQueueFull` check from `internalSendAsync` to `internalSend` (`canAddQueue`) to ensure the normal block in chunking.
    - Make producer send big messages by chunking.
    - Add `chunkedMsgCtxMap` to store chunked messages meta and data.
    - Make consumer can obtain chunks and consume the big message.
---
 pulsar/consumer.go                         |  10 +
 pulsar/consumer_impl.go                    | 105 +++---
 pulsar/consumer_multitopic.go              |   6 +-
 pulsar/consumer_partition.go               | 444 +++++++++++++++++++---
 pulsar/consumer_regex.go                   |   6 +-
 pulsar/impl_message.go                     |  52 +++
 pulsar/internal/batch_builder.go           |  36 +-
 pulsar/internal/commands.go                |  46 ++-
 pulsar/internal/key_based_batch_builder.go |   6 +-
 pulsar/message_chunking_test.go            | 570 +++++++++++++++++++++++++++++
 pulsar/producer.go                         |   9 +
 pulsar/producer_impl.go                    |   4 +
 pulsar/producer_partition.go               | 484 ++++++++++++++++++------
 pulsar/producer_test.go                    |  36 +-
 14 files changed, 1586 insertions(+), 228 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 574d6af..ffef786 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -198,6 +198,16 @@ type ConsumerOptions struct {
 	// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
 	// Default: false
 	AckWithResponse bool
+
+	// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
+	MaxPendingChunkedMessage int
+
+	// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
+	ExpireTimeOfIncompleteChunk time.Duration
+
+	// AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should
+	// be removed (e.g.the chunked message pending queue is full). (default: false)
+	AutoAckIncompleteChunk bool
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 517824d..a402a4d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"math/rand"
 	"strconv"
@@ -37,9 +36,9 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
 	// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
-	AckID(id trackingMessageID) error
-	AckIDWithResponse(id trackingMessageID) error
-	NackID(id trackingMessageID)
+	AckID(id MessageID) error
+	AckIDWithResponse(id MessageID) error
+	NackID(id MessageID)
 	NackMsg(msg Message)
 }
 
@@ -93,6 +92,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		}
 	}
 
+	if options.MaxPendingChunkedMessage == 0 {
+		options.MaxPendingChunkedMessage = 100
+	}
+
+	if options.ExpireTimeOfIncompleteChunk == 0 {
+		options.ExpireTimeOfIncompleteChunk = time.Minute
+	}
+
 	if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
 		options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
 	}
@@ -344,28 +351,31 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 				nackRedeliveryDelay = c.options.NackRedeliveryDelay
 			}
 			opts := &partitionConsumerOpts{
-				topic:                      pt,
-				consumerName:               c.consumerName,
-				subscription:               c.options.SubscriptionName,
-				subscriptionType:           c.options.Type,
-				subscriptionInitPos:        c.options.SubscriptionInitialPosition,
-				partitionIdx:               idx,
-				receiverQueueSize:          receiverQueueSize,
-				nackRedeliveryDelay:        nackRedeliveryDelay,
-				nackBackoffPolicy:          c.options.NackBackoffPolicy,
-				metadata:                   metadata,
-				subProperties:              subProperties,
-				replicateSubscriptionState: c.options.ReplicateSubscriptionState,
-				startMessageID:             trackingMessageID{},
-				subscriptionMode:           durable,
-				readCompacted:              c.options.ReadCompacted,
-				interceptors:               c.options.Interceptors,
-				maxReconnectToBroker:       c.options.MaxReconnectToBroker,
-				backoffPolicy:              c.options.BackoffPolicy,
-				keySharedPolicy:            c.options.KeySharedPolicy,
-				schema:                     c.options.Schema,
-				decryption:                 c.options.Decryption,
-				ackWithResponse:            c.options.AckWithResponse,
+				topic:                       pt,
+				consumerName:                c.consumerName,
+				subscription:                c.options.SubscriptionName,
+				subscriptionType:            c.options.Type,
+				subscriptionInitPos:         c.options.SubscriptionInitialPosition,
+				partitionIdx:                idx,
+				receiverQueueSize:           receiverQueueSize,
+				nackRedeliveryDelay:         nackRedeliveryDelay,
+				nackBackoffPolicy:           c.options.NackBackoffPolicy,
+				metadata:                    metadata,
+				subProperties:               subProperties,
+				replicateSubscriptionState:  c.options.ReplicateSubscriptionState,
+				startMessageID:              trackingMessageID{},
+				subscriptionMode:            durable,
+				readCompacted:               c.options.ReadCompacted,
+				interceptors:                c.options.Interceptors,
+				maxReconnectToBroker:        c.options.MaxReconnectToBroker,
+				backoffPolicy:               c.options.BackoffPolicy,
+				keySharedPolicy:             c.options.KeySharedPolicy,
+				schema:                      c.options.Schema,
+				decryption:                  c.options.Decryption,
+				ackWithResponse:             c.options.AckWithResponse,
+				maxPendingChunkedMessage:    c.options.MaxPendingChunkedMessage,
+				expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
+				autoAckIncompleteChunk:      c.options.AutoAckIncompleteChunk,
 			}
 			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
 			ch <- ConsumerError{
@@ -456,20 +466,15 @@ func (c *consumer) Ack(msg Message) error {
 
 // AckID the consumption of a single message, identified by its MessageID
 func (c *consumer) AckID(msgID MessageID) error {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return errors.New("failed to convert trackingMessageID")
-	}
-
-	if mid.consumer != nil {
-		return mid.Ack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {
+		return err
 	}
 
 	if c.options.AckWithResponse {
-		return c.consumers[mid.partitionIdx].AckIDWithResponse(mid)
+		return c.consumers[msgID.PartitionIdx()].AckIDWithResponse(msgID)
 	}
 
-	return c.consumers[mid.partitionIdx].AckID(mid)
+	return c.consumers[msgID.PartitionIdx()].AckID(msgID)
 }
 
 // ReconsumeLater mark a message for redelivery after custom delay
@@ -529,7 +534,7 @@ func (c *consumer) Nack(msg Message) {
 		}
 
 		if mid.consumer != nil {
-			mid.Nack()
+			mid.consumer.NackID(msg.ID())
 			return
 		}
 		c.consumers[mid.partitionIdx].NackMsg(msg)
@@ -540,17 +545,11 @@ func (c *consumer) Nack(msg Message) {
 }
 
 func (c *consumer) NackID(msgID MessageID) {
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return
-	}
-
-	if mid.consumer != nil {
-		mid.Nack()
+	if err := c.checkMsgIDPartition(msgID); err != nil {
 		return
 	}
 
-	c.consumers[mid.partitionIdx].NackID(mid)
+	c.consumers[msgID.PartitionIdx()].NackID(msgID)
 }
 
 func (c *consumer) Close() {
@@ -586,12 +585,11 @@ func (c *consumer) Seek(msgID MessageID) error {
 		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
 	}
 
-	mid, ok := c.messageID(msgID)
-	if !ok {
-		return nil
+	if err := c.checkMsgIDPartition(msgID); err != nil {
+		return err
 	}
 
-	return c.consumers[mid.partitionIdx].Seek(mid)
+	return c.consumers[msgID.PartitionIdx()].Seek(msgID)
 }
 
 func (c *consumer) SeekByTime(time time.Time) error {
@@ -608,6 +606,17 @@ func (c *consumer) SeekByTime(time time.Time) error {
 	return errs
 }
 
+func (c *consumer) checkMsgIDPartition(msgID MessageID) error {
+	partition := msgID.PartitionIdx()
+	if partition < 0 || int(partition) >= len(c.consumers) {
+		c.log.Errorf("invalid partition index %d expected a partition between [0-%d]",
+			partition, len(c.consumers))
+		return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
+			partition, len(c.consumers))
+	}
+	return nil
+}
+
 var r = &random{
 	R: rand.New(rand.NewSource(time.Now().UnixNano())),
 }
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 380dd75..d32ce31 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -137,10 +137,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
 	}
 
 	if c.options.AckWithResponse {
-		return mid.AckWithResponse()
+		return mid.consumer.AckIDWithResponse(msgID)
 	}
 
-	return mid.Ack()
+	return mid.consumer.AckID(msgID)
 }
 
 func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
@@ -200,7 +200,7 @@ func (c *multiTopicConsumer) NackID(msgID MessageID) {
 		return
 	}
 
-	mid.Nack()
+	mid.consumer.NackID(msgID)
 }
 
 func (c *multiTopicConsumer) Close() {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 5b61e7d..ebaa48b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"container/list"
 	"encoding/hex"
 	"errors"
 	"fmt"
@@ -92,30 +93,33 @@ const (
 )
 
 type partitionConsumerOpts struct {
-	topic                      string
-	consumerName               string
-	subscription               string
-	subscriptionType           SubscriptionType
-	subscriptionInitPos        SubscriptionInitialPosition
-	partitionIdx               int
-	receiverQueueSize          int
-	nackRedeliveryDelay        time.Duration
-	nackBackoffPolicy          NackBackoffPolicy
-	metadata                   map[string]string
-	subProperties              map[string]string
-	replicateSubscriptionState bool
-	startMessageID             trackingMessageID
-	startMessageIDInclusive    bool
-	subscriptionMode           subscriptionMode
-	readCompacted              bool
-	disableForceTopicCreation  bool
-	interceptors               ConsumerInterceptors
-	maxReconnectToBroker       *uint
-	backoffPolicy              internal.BackoffPolicy
-	keySharedPolicy            *KeySharedPolicy
-	schema                     Schema
-	decryption                 *MessageDecryptionInfo
-	ackWithResponse            bool
+	topic                       string
+	consumerName                string
+	subscription                string
+	subscriptionType            SubscriptionType
+	subscriptionInitPos         SubscriptionInitialPosition
+	partitionIdx                int
+	receiverQueueSize           int
+	nackRedeliveryDelay         time.Duration
+	nackBackoffPolicy           NackBackoffPolicy
+	metadata                    map[string]string
+	subProperties               map[string]string
+	replicateSubscriptionState  bool
+	startMessageID              trackingMessageID
+	startMessageIDInclusive     bool
+	subscriptionMode            subscriptionMode
+	readCompacted               bool
+	disableForceTopicCreation   bool
+	interceptors                ConsumerInterceptors
+	maxReconnectToBroker        *uint
+	backoffPolicy               internal.BackoffPolicy
+	keySharedPolicy             *KeySharedPolicy
+	schema                      Schema
+	decryption                  *MessageDecryptionInfo
+	ackWithResponse             bool
+	maxPendingChunkedMessage    int
+	expireTimeOfIncompleteChunk time.Duration
+	autoAckIncompleteChunk      bool
 }
 
 type partitionConsumer struct {
@@ -161,6 +165,9 @@ type partitionConsumer struct {
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
 	schemaInfoCache      *schemaInfoCache
+
+	chunkedMsgCtxMap   *chunkedMsgCtxMap
+	unAckChunksTracker *unAckChunksTracker
 }
 
 type schemaInfoCache struct {
@@ -236,6 +243,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
 		schemaInfoCache:      newSchemaInfoCache(client, options.topic),
 		availablePermitsCh:   make(chan permitsReq, 10),
 	}
+	pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
+	pc.unAckChunksTracker = newUnAckChunksTracker(pc)
 	pc.setConsumerState(consumerInit)
 	pc.log = client.log.SubLogger(log.Fields{
 		"name":         pc.name,
@@ -378,18 +387,27 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
 	return convertToMessageID(id), nil
 }
 
-func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error {
+func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
 	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
 		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
 		return errors.New("consumer state is closed")
 	}
 
+	if cmid, ok := toChunkedMessageID(msgID); ok {
+		return pc.unAckChunksTracker.ack(cmid)
+	}
+
+	trackingID, ok := toTrackingMessageID(msgID)
+	if !ok {
+		return errors.New("failed to convert trackingMessageID")
+	}
+
 	ackReq := new(ackRequest)
 	ackReq.doneCh = make(chan struct{})
-	if !msgID.Undefined() && msgID.ack() {
+	if !trackingID.Undefined() && trackingID.ack() {
 		pc.metrics.AcksCounter.Inc()
-		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
-		ackReq.msgID = msgID
+		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
+		ackReq.msgID = trackingID
 		// send ack request to eventsCh
 		pc.eventsCh <- ackReq
 		// wait for the request to complete
@@ -401,18 +419,27 @@ func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error {
 	return ackReq.err
 }
 
-func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
 	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
 		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
 		return errors.New("consumer state is closed")
 	}
 
+	if cmid, ok := toChunkedMessageID(msgID); ok {
+		return pc.unAckChunksTracker.ack(cmid)
+	}
+
+	trackingID, ok := toTrackingMessageID(msgID)
+	if !ok {
+		return errors.New("failed to convert trackingMessageID")
+	}
+
 	ackReq := new(ackRequest)
 	ackReq.doneCh = make(chan struct{})
-	if !msgID.Undefined() && msgID.ack() {
+	if !trackingID.Undefined() && trackingID.ack() {
 		pc.metrics.AcksCounter.Inc()
-		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
-		ackReq.msgID = msgID
+		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
+		ackReq.msgID = trackingID
 		// send ack request to eventsCh
 		pc.eventsCh <- ackReq
 		// No need to wait for ackReq.doneCh to finish
@@ -423,8 +450,18 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
 	return ackReq.err
 }
 
-func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
-	pc.nackTracker.Add(msgID.messageID)
+func (pc *partitionConsumer) NackID(msgID MessageID) {
+	if cmid, ok := toChunkedMessageID(msgID); ok {
+		pc.unAckChunksTracker.nack(cmid)
+		return
+	}
+
+	trackingID, ok := toTrackingMessageID(msgID)
+	if !ok {
+		return
+	}
+
+	pc.nackTracker.Add(trackingID.messageID)
 	pc.metrics.NacksCounter.Inc()
 }
 
@@ -487,6 +524,9 @@ func (pc *partitionConsumer) Close() {
 		return
 	}
 
+	// close chunkedMsgCtxMap
+	pc.chunkedMsgCtxMap.Close()
+
 	req := &closeRequest{doneCh: make(chan struct{})}
 	pc.eventsCh <- req
 
@@ -494,15 +534,23 @@ func (pc *partitionConsumer) Close() {
 	<-req.doneCh
 }
 
-func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
+func (pc *partitionConsumer) Seek(msgID MessageID) error {
 	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
 		pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer")
 		return errors.New("failed to seek by closing or closed consumer")
 	}
 	req := &seekRequest{
 		doneCh: make(chan struct{}),
-		msgID:  msgID,
 	}
+	if cmid, ok := toChunkedMessageID(msgID); ok {
+		req.msgID = cmid.firstChunkID
+	} else if tmid, ok := toTrackingMessageID(msgID); ok {
+		req.msgID = tmid.messageID
+	} else {
+		// will never reach
+		return errors.New("unhandled messageID type")
+	}
+
 	pc.eventsCh <- req
 
 	// wait for the request to complete
@@ -512,7 +560,7 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 	defer close(seek.doneCh)
-	seek.err = pc.requestSeek(seek.msgID.messageID)
+	seek.err = pc.requestSeek(seek.msgID)
 }
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
 	if err := pc.requestSeekWithoutClear(msgID); err != nil {
@@ -698,8 +746,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		}
 	}
 
+	isChunkedMsg := false
+	if msgMeta.GetNumChunksFromMsg() > 1 {
+		isChunkedMsg = true
+	}
+
+	processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)
+	if isChunkedMsg {
+		processedPayloadBuffer = pc.processMessageChunk(processedPayloadBuffer, msgMeta, pbMsgID)
+		if processedPayloadBuffer == nil {
+			return nil
+		}
+	}
+
 	// decryption is success, decompress the payload
-	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, internal.NewBufferWrapper(decryptedPayload))
+	uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer)
 	if err != nil {
 		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
 		return err
@@ -733,17 +794,40 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 		pc.metrics.BytesReceived.Add(float64(len(payload)))
 		pc.metrics.PrefetchedBytes.Add(float64(len(payload)))
 
-		msgID := newTrackingMessageID(
+		trackingMsgID := newTrackingMessageID(
 			int64(pbMsgID.GetLedgerId()),
 			int64(pbMsgID.GetEntryId()),
 			int32(i),
 			pc.partitionIdx,
 			ackTracker)
+		// set the consumer so we know how to ack the message id
+		trackingMsgID.consumer = pc
 
-		if pc.messageShouldBeDiscarded(msgID) {
-			pc.AckID(msgID)
+		if pc.messageShouldBeDiscarded(trackingMsgID) {
+			pc.AckID(trackingMsgID)
 			continue
 		}
+
+		var msgID MessageID
+		if isChunkedMsg {
+			ctx := pc.chunkedMsgCtxMap.get(msgMeta.GetUuid())
+			if ctx == nil {
+				// chunkedMsgCtxMap has closed because of consumer closed
+				pc.log.Warnf("get chunkedMsgCtx for chunk with uuid %s failed because consumer has closed",
+					msgMeta.Uuid)
+				return nil
+			}
+			cmid := newChunkMessageID(ctx.firstChunkID(), ctx.lastChunkID())
+			// set the consumer so we know how to ack the message id
+			cmid.consumer = pc
+			// clean chunkedMsgCtxMap
+			pc.chunkedMsgCtxMap.remove(msgMeta.GetUuid())
+			pc.unAckChunksTracker.add(cmid, ctx.chunkedMsgIDs)
+			msgID = cmid
+		} else {
+			msgID = trackingMsgID
+		}
+
 		var messageIndex *uint64
 		var brokerPublishTime *time.Time
 		if brokerMetadata != nil {
@@ -756,8 +840,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				brokerPublishTime = &aux
 			}
 		}
-		// set the consumer so we know how to ack the message id
-		msgID.consumer = pc
+
 		var msg *message
 		if smm != nil {
 			msg = &message{
@@ -813,6 +896,55 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	return nil
 }
 
+func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buffer,
+	msgMeta *pb.MessageMetadata,
+	pbMsgID *pb.MessageIdData) internal.Buffer {
+	uuid := msgMeta.GetUuid()
+	numChunks := msgMeta.GetNumChunksFromMsg()
+	totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
+	chunkID := msgMeta.GetChunkId()
+	msgID := messageID{
+		ledgerID:     int64(pbMsgID.GetLedgerId()),
+		entryID:      int64(pbMsgID.GetEntryId()),
+		batchIdx:     -1,
+		partitionIdx: pc.partitionIdx,
+	}
+
+	if msgMeta.GetChunkId() == 0 {
+		pc.chunkedMsgCtxMap.addIfAbsent(uuid,
+			numChunks,
+			totalChunksSize,
+		)
+	}
+
+	ctx := pc.chunkedMsgCtxMap.get(uuid)
+
+	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
+		lastChunkedMsgID := -1
+		totalChunks := -1
+		if ctx != nil {
+			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
+			totalChunks = int(ctx.totalChunks)
+			ctx.chunkedMsgBuffer.Clear()
+		}
+		pc.log.Warnf(fmt.Sprintf(
+			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
+			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+		pc.chunkedMsgCtxMap.remove(uuid)
+		pc.availablePermitsCh <- permitsInc
+		return nil
+	}
+
+	ctx.append(chunkID, msgID, compressedPayload)
+
+	if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
+		pc.availablePermitsCh <- permitsInc
+		return nil
+	}
+
+	return ctx.chunkedMsgBuffer
+}
+
 func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) bool {
 	if pc.startMessageID.Undefined() {
 		return false
@@ -1045,7 +1177,7 @@ type getLastMsgIDRequest struct {
 
 type seekRequest struct {
 	doneCh chan struct{}
-	msgID  trackingMessageID
+	msgID  messageID
 	err    error
 }
 
@@ -1508,3 +1640,227 @@ func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
 
 	return msgID
 }
+
+type chunkedMsgCtx struct {
+	totalChunks      int32
+	chunkedMsgBuffer internal.Buffer
+	lastChunkedMsgID int32
+	chunkedMsgIDs    []messageID
+	receivedTime     int64
+
+	mu sync.Mutex
+}
+
+func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+	return &chunkedMsgCtx{
+		totalChunks:      numChunksFromMsg,
+		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
+		lastChunkedMsgID: -1,
+		chunkedMsgIDs:    make([]messageID, numChunksFromMsg),
+		receivedTime:     time.Now().Unix(),
+	}
+}
+
+func (c *chunkedMsgCtx) append(chunkID int32, msgID messageID, partPayload internal.Buffer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.chunkedMsgIDs[chunkID] = msgID
+	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.lastChunkedMsgID = chunkID
+}
+
+func (c *chunkedMsgCtx) firstChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[0]
+}
+
+func (c *chunkedMsgCtx) lastChunkID() messageID {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if len(c.chunkedMsgIDs) == 0 {
+		return messageID{}
+	}
+	return c.chunkedMsgIDs[len(c.chunkedMsgIDs)-1]
+}
+
+func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	for _, mid := range c.chunkedMsgIDs {
+		pc.log.Info("Removing chunk message-id", mid.String())
+		tmid, _ := toTrackingMessageID(mid)
+		pc.AckID(tmid)
+	}
+}
+
+type chunkedMsgCtxMap struct {
+	chunkedMsgCtxs map[string]*chunkedMsgCtx
+	pendingQueue   *list.List
+	maxPending     int
+	pc             *partitionConsumer
+	mu             sync.Mutex
+	closed         bool
+}
+
+func newChunkedMsgCtxMap(maxPending int, pc *partitionConsumer) *chunkedMsgCtxMap {
+	return &chunkedMsgCtxMap{
+		chunkedMsgCtxs: make(map[string]*chunkedMsgCtx, maxPending),
+		pendingQueue:   list.New(),
+		maxPending:     maxPending,
+		pc:             pc,
+		mu:             sync.Mutex{},
+	}
+}
+
+func (c *chunkedMsgCtxMap) addIfAbsent(uuid string, totalChunks int32, totalChunkMsgSize int) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	if _, ok := c.chunkedMsgCtxs[uuid]; !ok {
+		c.chunkedMsgCtxs[uuid] = newChunkedMsgCtx(totalChunks, totalChunkMsgSize)
+		c.pendingQueue.PushBack(uuid)
+		go c.discardChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk)
+	}
+	if c.maxPending > 0 && c.pendingQueue.Len() > c.maxPending {
+		go c.discardOldestChunkMessage(c.pc.options.autoAckIncompleteChunk)
+	}
+}
+
+func (c *chunkedMsgCtxMap) get(uuid string) *chunkedMsgCtx {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return nil
+	}
+	return c.chunkedMsgCtxs[uuid]
+}
+
+func (c *chunkedMsgCtxMap) remove(uuid string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+}
+
+func (c *chunkedMsgCtxMap) discardOldestChunkMessage(autoAck bool) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed || (c.maxPending > 0 && c.pendingQueue.Len() <= c.maxPending) {
+		return
+	}
+	oldest := c.pendingQueue.Front().Value.(string)
+	ctx, ok := c.chunkedMsgCtxs[oldest]
+	if !ok {
+		return
+	}
+	if autoAck {
+		ctx.discard(c.pc)
+	}
+	delete(c.chunkedMsgCtxs, oldest)
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", oldest)
+}
+
+func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.closed {
+		return
+	}
+	ctx, ok := c.chunkedMsgCtxs[uuid]
+	if !ok {
+		return
+	}
+	if autoAck {
+		ctx.discard(c.pc)
+	}
+	delete(c.chunkedMsgCtxs, uuid)
+	e := c.pendingQueue.Front()
+	for ; e != nil; e = e.Next() {
+		if e.Value.(string) == uuid {
+			c.pendingQueue.Remove(e)
+			break
+		}
+	}
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+}
+
+func (c *chunkedMsgCtxMap) discardChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {
+	timer := time.NewTimer(expire)
+	<-timer.C
+	c.discardChunkMessage(uuid, autoAck)
+}
+
+func (c *chunkedMsgCtxMap) Close() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.closed = true
+}
+
+type unAckChunksTracker struct {
+	chunkIDs map[chunkMessageID][]messageID
+	pc       *partitionConsumer
+	mu       sync.Mutex
+}
+
+func newUnAckChunksTracker(pc *partitionConsumer) *unAckChunksTracker {
+	return &unAckChunksTracker{
+		chunkIDs: make(map[chunkMessageID][]messageID),
+		pc:       pc,
+	}
+}
+
+func (u *unAckChunksTracker) add(cmid chunkMessageID, ids []messageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	u.chunkIDs[cmid] = ids
+}
+
+func (u *unAckChunksTracker) get(cmid chunkMessageID) []messageID {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	return u.chunkIDs[cmid]
+}
+
+func (u *unAckChunksTracker) remove(cmid chunkMessageID) {
+	u.mu.Lock()
+	defer u.mu.Unlock()
+
+	delete(u.chunkIDs, cmid)
+}
+
+func (u *unAckChunksTracker) ack(cmid chunkMessageID) error {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		if err := u.pc.AckID(id); err != nil {
+			return err
+		}
+	}
+	u.remove(cmid)
+	return nil
+}
+
+func (u *unAckChunksTracker) nack(cmid chunkMessageID) {
+	ids := u.get(cmid)
+	for _, id := range ids {
+		u.pc.NackID(id)
+	}
+	u.remove(cmid)
+}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index c55a1c1..55f3d7a 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -181,10 +181,10 @@ func (c *regexConsumer) AckID(msgID MessageID) error {
 	}
 
 	if c.options.AckWithResponse {
-		return mid.AckWithResponse()
+		return mid.consumer.AckIDWithResponse(msgID)
 	}
 
-	return mid.Ack()
+	return mid.consumer.AckID(msgID)
 }
 
 func (c *regexConsumer) Nack(msg Message) {
@@ -219,7 +219,7 @@ func (c *regexConsumer) NackID(msgID MessageID) {
 		return
 	}
 
-	mid.Nack()
+	mid.consumer.NackID(msgID)
 }
 
 func (c *regexConsumer) Close() {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d155ae7..11ecb40 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -210,11 +210,25 @@ func toTrackingMessageID(msgID MessageID) (trackingMessageID, bool) {
 		}, true
 	} else if mid, ok := msgID.(trackingMessageID); ok {
 		return mid, true
+	} else if cmid, ok := msgID.(chunkMessageID); ok {
+		return trackingMessageID{
+			messageID:    cmid.messageID,
+			receivedTime: cmid.receivedTime,
+			consumer:     cmid.consumer,
+		}, true
 	} else {
 		return trackingMessageID{}, false
 	}
 }
 
+func toChunkedMessageID(msgID MessageID) (chunkMessageID, bool) {
+	cid, ok := msgID.(chunkMessageID)
+	if ok {
+		return cid, true
+	}
+	return chunkMessageID{}, false
+}
+
 func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
 	ts := int64(timestamp) * int64(time.Millisecond)
 	seconds := ts / int64(time.Second)
@@ -372,3 +386,41 @@ func (t *ackTracker) completed() bool {
 	defer t.Unlock()
 	return len(t.batchIDs.Bits()) == 0
 }
+
+type chunkMessageID struct {
+	messageID
+
+	firstChunkID messageID
+	receivedTime time.Time
+
+	consumer acker
+}
+
+func newChunkMessageID(firstChunkID messageID, lastChunkID messageID) chunkMessageID {
+	return chunkMessageID{
+		messageID:    lastChunkID,
+		firstChunkID: firstChunkID,
+		receivedTime: time.Now(),
+	}
+}
+
+func (id chunkMessageID) String() string {
+	return fmt.Sprintf("%s;%s", id.firstChunkID.String(), id.messageID.String())
+}
+
+func (id chunkMessageID) Serialize() []byte {
+	msgID := &pb.MessageIdData{
+		LedgerId:   proto.Uint64(uint64(id.ledgerID)),
+		EntryId:    proto.Uint64(uint64(id.entryID)),
+		BatchIndex: proto.Int32(id.batchIdx),
+		Partition:  proto.Int32(id.partitionIdx),
+		FirstChunkMessageId: &pb.MessageIdData{
+			LedgerId:   proto.Uint64(uint64(id.firstChunkID.ledgerID)),
+			EntryId:    proto.Uint64(uint64(id.firstChunkID.entryID)),
+			BatchIndex: proto.Int32(id.firstChunkID.batchIdx),
+			Partition:  proto.Int32(id.firstChunkID.partitionIdx),
+		},
+	}
+	data, _ := proto.Marshal(msgID)
+	return data
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fe8f628..ca19a3e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -35,7 +35,7 @@ type BuffersPool interface {
 
 // BatcherBuilderProvider defines func which returns the BatchBuilder.
 type BatcherBuilderProvider func(
-	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
+	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
 	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error)
@@ -85,6 +85,8 @@ type batchContainer struct {
 	// without needing costly re-allocations.
 	maxBatchSize uint
 
+	maxMessageSize uint32
+
 	producerName string
 	producerID   uint64
 
@@ -102,18 +104,19 @@ type batchContainer struct {
 
 // newBatchContainer init a batchContainer
 func newBatchContainer(
-	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
+	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
 	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) batchContainer {
 
 	bc := batchContainer{
-		buffer:       NewBuffer(4096),
-		numMessages:  0,
-		maxMessages:  maxMessages,
-		maxBatchSize: maxBatchSize,
-		producerName: producerName,
-		producerID:   producerID,
+		buffer:         NewBuffer(4096),
+		numMessages:    0,
+		maxMessages:    maxMessages,
+		maxBatchSize:   maxBatchSize,
+		maxMessageSize: maxMessageSize,
+		producerName:   producerName,
+		producerID:     producerID,
 		cmdSend: baseCommand(
 			pb.BaseCommand_SEND,
 			&pb.CommandSend{
@@ -124,7 +127,7 @@ func newBatchContainer(
 			ProducerName: &producerName,
 		},
 		callbacks:           []interface{}{},
-		compressionProvider: getCompressionProvider(compressionType, level),
+		compressionProvider: GetCompressionProvider(compressionType, level),
 		buffersPool:         bufferPool,
 		log:                 logger,
 		encryptor:           encryptor,
@@ -139,13 +142,13 @@ func newBatchContainer(
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(
-	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
+	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
 	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
 
 	bc := newBatchContainer(
-		maxMessages, maxBatchSize, producerName, producerID, compressionType,
+		maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType,
 		level, bufferPool, logger, encryptor,
 	)
 
@@ -164,7 +167,9 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
 		return true
 	}
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
+	expectedSize := bc.buffer.ReadableBytes() + msgSize
+	return bc.numMessages+1 <= bc.maxMessages &&
+		expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
 }
 
 func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {
@@ -258,8 +263,9 @@ func (bc *batchContainer) Flush() (
 		buffer = NewBuffer(int(uncompressedSize * 3 / 2))
 	}
 
-	if err = serializeBatch(
-		buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor,
+	if err = serializeMessage(
+		buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
+		bc.encryptor, bc.maxMessageSize, true,
 	); err == nil { // no error in serializing Batch
 		sequenceID = bc.cmdSend.Send.GetSequenceId()
 	}
@@ -285,7 +291,7 @@ func (bc *batchContainer) Close() error {
 	return bc.compressionProvider.Close()
 }
 
-func getCompressionProvider(
+func GetCompressionProvider(
 	compressionType pb.CompressionType,
 	level compression.Level,
 ) compression.Provider {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 94217a5..c28593c 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -50,6 +50,8 @@ var ErrEOM = errors.New("EOF")
 
 var ErrConnectionClosed = errors.New("connection closed")
 
+var ErrExceedMaxMessageSize = errors.New("encryptedPayload exceeds MaxMessageSize")
+
 func NewMessageReader(headersAndPayload Buffer) *MessageReader {
 	return &MessageReader{
 		buffer: headersAndPayload,
@@ -237,17 +239,24 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
 	wb.Write(payload)
 }
 
-func serializeBatch(wb Buffer,
+func serializeMessage(wb Buffer,
 	cmdSend *pb.BaseCommand,
 	msgMetadata *pb.MessageMetadata,
-	uncompressedPayload Buffer,
+	payload Buffer,
 	compressionProvider compression.Provider,
-	encryptor crypto.Encryptor) error {
+	encryptor crypto.Encryptor,
+	maxMessageSize uint32,
+	doCompress bool) error {
 	// Wire format
 	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
 
 	// compress the payload
-	compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+	var compressedPayload []byte
+	if doCompress {
+		compressedPayload = compressionProvider.Compress(nil, payload.ReadableSlice())
+	} else {
+		compressedPayload = payload.ReadableSlice()
+	}
 
 	// encrypt the compressed payload
 	encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
@@ -258,6 +267,13 @@ func serializeBatch(wb Buffer,
 
 	cmdSize := uint32(proto.Size(cmdSend))
 	msgMetadataSize := uint32(proto.Size(msgMetadata))
+	msgSize := len(encryptedPayload) + int(msgMetadataSize)
+
+	// the maxMessageSize check of batching message is in here
+	if !(msgMetadata.GetTotalChunkMsgSize() != 0) && msgSize > int(maxMessageSize) {
+		return fmt.Errorf("%w, size: %d, MaxMessageSize: %d",
+			ErrExceedMaxMessageSize, msgSize, maxMessageSize)
+	}
 
 	frameSizeIdx := wb.WriterIndex()
 	wb.WriteUint32(0) // Skip frame size until we now the size
@@ -300,6 +316,28 @@ func serializeBatch(wb Buffer,
 	return nil
 }
 
+func SingleSend(wb Buffer,
+	producerID, sequenceID uint64,
+	msgMetadata *pb.MessageMetadata,
+	compressedPayload Buffer,
+	encryptor crypto.Encryptor,
+	maxMassageSize uint32) error {
+	cmdSend := baseCommand(
+		pb.BaseCommand_SEND,
+		&pb.CommandSend{
+			ProducerId: &producerID,
+		},
+	)
+	cmdSend.Send.SequenceId = &sequenceID
+	if msgMetadata.GetTotalChunkMsgSize() > 1 {
+		isChunk := true
+		cmdSend.Send.IsChunk = &isChunk
+	}
+	// payload has been compressed so compressionProvider can be nil
+	return serializeMessage(wb, cmdSend, msgMetadata, compressedPayload,
+		nil, encryptor, maxMassageSize, false)
+}
+
 // ConvertFromStringMap convert a string map to a KeyValue []byte
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
 	list := make([]*pb.KeyValue, len(m))
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 77fbb8c..334e674 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -84,7 +84,7 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
 // NewKeyBasedBatchBuilder init batch builder and return BatchBuilder
 // pointer. Build a new key based batch message container.
 func NewKeyBasedBatchBuilder(
-	maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
+	maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
 	bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
@@ -92,7 +92,7 @@ func NewKeyBasedBatchBuilder(
 	bb := &keyBasedBatchContainer{
 		batches: newKeyBasedBatches(),
 		batchContainer: newBatchContainer(
-			maxMessages, maxBatchSize, producerName, producerID,
+			maxMessages, maxBatchSize, maxMessageSize, producerName, producerID,
 			compressionType, level, bufferPool, logger, encryptor,
 		),
 		compressionType: compressionType,
@@ -151,7 +151,7 @@ func (bc *keyBasedBatchContainer) Add(
 	if batchPart == nil {
 		// create batchContainer for new key
 		t := newBatchContainer(
-			bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID,
+			bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID,
 			bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor,
 		)
 		batchPart = &t
diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
new file mode 100644
index 0000000..b3d64af
--- /dev/null
+++ b/pulsar/message_chunking_test.go
@@ -0,0 +1,570 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+var _brokerMaxMessageSize = 1024 * 1024
+
+func TestInvalidChunkingConfig(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: false,
+		EnableChunking:  true,
+	})
+
+	assert.Error(t, err, "producer creation should have fail")
+	assert.Nil(t, producer)
+}
+
+func TestLargeMessage(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create producer without ChunkMaxMessageSize
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+		EnableChunking:  true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()
+
+	// create producer with ChunkMaxMessageSize
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     true,
+		EnableChunking:      true,
+		ChunkMaxMessageSize: 5,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "chunk-subscriber",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	expectMsgs := make([][]byte, 0, 10)
+
+	// test send chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(_brokerMaxMessageSize + 1)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer1.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with serverMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	// test send chunk with ChunkMaxMessageSize limit
+	for i := 0; i < 5; i++ {
+		msg := createTestMessagePayload(50)
+		expectMsgs = append(expectMsgs, msg)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: msg,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
+	}
+
+	// test receive chunk with ChunkMaxMessageSize limit
+	for i := 5; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := expectMsgs[i]
+
+		assert.Equal(t, expectMsg, msg.Payload())
+		// ack message
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+}
+
+func TestMaxPendingChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	totalProducers := 5
+	producers := make([]Producer, 0, 20)
+	defer func() {
+		for _, p := range producers {
+			p.Close()
+		}
+	}()
+
+	clients := make([]Client, 0, 20)
+	defer func() {
+		for _, c := range clients {
+			c.Close()
+		}
+	}()
+
+	for j := 0; j < totalProducers; j++ {
+		pc, err := NewClient(ClientOptions{
+			URL: lookupURL,
+		})
+		assert.Nil(t, err)
+		clients = append(clients, pc)
+		producer, err := pc.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: 10,
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, producer)
+		producers = append(producers, producer)
+	}
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                    topic,
+		Type:                     Exclusive,
+		SubscriptionName:         "chunk-subscriber",
+		MaxPendingChunkedMessage: 1,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	totalMsgs := 40
+	wg := sync.WaitGroup{}
+	wg.Add(totalMsgs * totalProducers)
+	for i := 0; i < totalMsgs; i++ {
+		for j := 0; j < totalProducers; j++ {
+			p := producers[j]
+			go func() {
+				ID, err := p.Send(context.Background(), &ProducerMessage{
+					Payload: createTestMessagePayload(50),
+				})
+				assert.NoError(t, err)
+				assert.NotNil(t, ID)
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+
+	received := 0
+	for i := 0; i < totalMsgs*totalProducers; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		if msg == nil || (err != nil && errors.Is(err, context.DeadlineExceeded)) {
+			break
+		}
+
+		received++
+
+		err = consumer.Ack(msg)
+		assert.NoError(t, err)
+	}
+
+	assert.NotEqual(t, totalMsgs*totalProducers, received)
+}
+
+func TestExpireIncompleteChunks(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	c, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		Type:                        Exclusive,
+		SubscriptionName:            "chunk-subscriber",
+		ExpireTimeOfIncompleteChunk: time.Millisecond * 300,
+	})
+	assert.NoError(t, err)
+	defer c.Close()
+
+	uuid := "test-uuid"
+	chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap
+	chunkCtxMap.addIfAbsent(uuid, 2, 100)
+	ctx := chunkCtxMap.get(uuid)
+	assert.NotNil(t, ctx)
+
+	time.Sleep(400 * time.Millisecond)
+
+	ctx = chunkCtxMap.get(uuid)
+	assert.Nil(t, ctx)
+}
+
+func TestChunksEnqueueFailed(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		EnableChunking:          true,
+		DisableBatching:         true,
+		MaxPendingMessages:      10,
+		ChunkMaxMessageSize:     50,
+		DisableBlockIfQueueFull: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(1000),
+	})
+	assert.Error(t, err)
+	assert.Nil(t, ID)
+}
+
+func TestSeekChunkMessages(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	totalMessages := 5
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		Type:             Exclusive,
+		SubscriptionName: "default-seek",
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	msgIDs := make([]MessageID, 0)
+	for i := 0; i < totalMessages; i++ {
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(100),
+		})
+		assert.NoError(t, err)
+		msgIDs = append(msgIDs, ID)
+	}
+
+	for i := 0; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	err = consumer.Seek(msgIDs[1])
+	assert.NoError(t, err)
+
+	for i := 1; i < totalMessages; i++ {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		msg, err := consumer.Receive(ctx)
+		cancel()
+		assert.NoError(t, err)
+		assert.NotNil(t, msg)
+		assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize())
+	}
+
+	// todo: add reader seek test when support reader read chunk message
+}
+
+func TestChunkAckAndNAck(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		ChunkMaxMessageSize: 50,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		Type:                Exclusive,
+		SubscriptionName:    "default-seek",
+		NackRedeliveryDelay: time.Second,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, consumer)
+	defer consumer.Close()
+
+	content := createTestMessagePayload(100)
+
+	_, err = producer.Send(context.Background(), &ProducerMessage{
+		Payload: content,
+	})
+	assert.NoError(t, err)
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+	msg, err := consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+
+	consumer.Nack(msg)
+	time.Sleep(time.Second * 2)
+
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
+	msg, err = consumer.Receive(ctx)
+	cancel()
+	assert.NoError(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, msg.Payload(), content)
+}
+
+func TestChunkSize(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// the default message metadata size for string schema
+	// The proto messageMetaData size as following. (all with tag) (maxMessageSize = 1024 * 1024)
+	// | producerName | sequenceID | publishTime | uncompressedSize |
+	// | ------------ | ---------- | ----------- | ---------------- |
+	// | 6            | 2          | 7           | 4                |
+	payloadChunkSize := _brokerMaxMessageSize - 19
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Name:            "test",
+		Topic:           topic,
+		EnableChunking:  true,
+		DisableBatching: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ {
+		msgID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: createTestMessagePayload(size),
+		})
+		assert.NoError(t, err)
+		if size <= payloadChunkSize {
+			_, ok := msgID.(messageID)
+			assert.Equal(t, true, ok)
+		} else {
+			_, ok := msgID.(chunkMessageID)
+			assert.Equal(t, true, ok)
+		}
+	}
+}
+
+func TestChunkMultiTopicConsumerReceive(t *testing.T) {
+	topic1 := newTopicName()
+	topic2 := newTopicName()
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	topics := []string{topic1, topic2}
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topics:           topics,
+		SubscriptionName: "multi-topic-sub",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	maxSize := 50
+
+	// produce messages
+	for i, topic := range topics {
+		p, err := client.CreateProducer(ProducerOptions{
+			Topic:               topic,
+			DisableBatching:     true,
+			EnableChunking:      true,
+			ChunkMaxMessageSize: uint(maxSize),
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+		err = genMessages(p, 10, func(idx int) string {
+			return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, string(createTestMessagePayload(100)))
+		})
+		p.Close()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	receivedTopic1 := 0
+	receivedTopic2 := 0
+	// nolint
+	for receivedTopic1+receivedTopic2 < 20 {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+		select {
+		case cm, ok := <-consumer.Chan():
+			if ok {
+				msg := string(cm.Payload())
+				if strings.HasPrefix(msg, "topic-1") {
+					receivedTopic1++
+				} else if strings.HasPrefix(msg, "topic-2") {
+					receivedTopic2++
+				}
+				consumer.Ack(cm.Message)
+			} else {
+				t.Fail()
+			}
+		case <-ctx.Done():
+			t.Error(ctx.Err())
+		}
+		cancel()
+	}
+	assert.Equal(t, receivedTopic1, receivedTopic2)
+}
+
+func TestChunkBlockIfQueueFull(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	topic := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Name:                "test",
+		Topic:               topic,
+		EnableChunking:      true,
+		DisableBatching:     true,
+		MaxPendingMessages:  1,
+		ChunkMaxMessageSize: 10,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	// Large messages will be split into 11 chunks, exceeding the length of pending queue
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: createTestMessagePayload(100),
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, ID)
+}
+
+func createTestMessagePayload(size int) []byte {
+	payload := make([]byte, size)
+	for i := range payload {
+		payload[i] = byte(rand.Intn(100))
+	}
+	return payload
+}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index b4e43bd..d088fb2 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -178,6 +178,15 @@ type ProducerOptions struct {
 
 	// Encryption specifies the fields required to encrypt a message
 	Encryption *ProducerEncryptionInfo
+
+	// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
+	// is disabled.
+	// Chunking can not be enabled when batching is enabled.
+	EnableChunking bool
+
+	// ChunkMaxMessageSize is the max size of single chunk payload.
+	// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
+	ChunkMaxMessageSize uint
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 9bbfccb..3c45b59 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -94,6 +94,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval
 	}
 
+	if !options.DisableBatching && options.EnableChunking {
+		return nil, fmt.Errorf("batching and chunking can not be enabled together")
+	}
+
 	p := &producer{
 		options: options,
 		topic:   options.Topic,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 922d89d..881451c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -20,6 +20,9 @@ package pulsar
 import (
 	"context"
 	"errors"
+	"fmt"
+	"math"
+	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -53,6 +56,7 @@ var (
 	errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
 	errContextExpired  = newError(TimeoutError, "message send context expired")
 	errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
+	errMetaTooLarge    = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
 	errProducerClosed  = newError(ProducerClosed, "producer already been closed")
 
 	buffersPool sync.Pool
@@ -75,6 +79,8 @@ type partitionProducer struct {
 	batchBuilder             internal.BatchBuilder
 	sequenceIDGenerator      *uint64
 	batchFlushTicker         *time.Ticker
+	encryptor                internalcrypto.Encryptor
+	compressionProvider      compression.Provider
 
 	// Channel where app is posting messages to be published
 	eventsChan      chan interface{}
@@ -146,6 +152,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		connectClosedCh:  make(chan connectionClosed, 10),
 		closeCh:          make(chan struct{}),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
+		compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
+			compression.Level(options.CompressionLevel)),
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
 		lastSequenceID:   -1,
@@ -246,42 +254,13 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 
-	var encryptor internalcrypto.Encryptor
 	if p.options.Encryption != nil {
-		encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
+		p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
 			p.options.Encryption.KeyReader,
 			p.options.Encryption.MessageCrypto,
 			p.options.Encryption.ProducerCryptoFailureAction, p.log)
 	} else {
-		encryptor = internalcrypto.NewNoopEncryptor()
-	}
-
-	if p.options.DisableBatching {
-		provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
-		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
-			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
-			compression.Level(p.options.CompressionLevel),
-			p,
-			p.log,
-			encryptor)
-		if err != nil {
-			return err
-		}
-	} else if p.batchBuilder == nil {
-		provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
-		if err != nil {
-			provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder)
-		}
-
-		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
-			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
-			compression.Level(p.options.CompressionLevel),
-			p,
-			p.log,
-			encryptor)
-		if err != nil {
-			return err
-		}
+		p.encryptor = internalcrypto.NewNoopEncryptor()
 	}
 
 	if p.sequenceIDGenerator == nil {
@@ -299,6 +278,24 @@ func (p *partitionProducer) grabCnx() error {
 	if err != nil {
 		return err
 	}
+
+	if !p.options.DisableBatching && p.batchBuilder == nil {
+		provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
+		if err != nil {
+			return err
+		}
+		maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
+		p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
+			maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
+			compression.Level(p.options.CompressionLevel),
+			p,
+			p.log,
+			p.encryptor)
+		if err != nil {
+			return err
+		}
+	}
+
 	p.log.WithFields(log.Fields{
 		"cnx":   res.Cnx.ID(),
 		"epoch": atomic.LoadUint64(&p.epoch),
@@ -323,7 +320,7 @@ func (p *partitionProducer) grabCnx() error {
 			pi.sentAt = time.Now()
 			pi.Unlock()
 			p.pendingQueue.Put(pi)
-			p._getConn().WriteData(pi.batchData)
+			p._getConn().WriteData(pi.buffer)
 
 			if pi == lastViewItem {
 				break
@@ -472,12 +469,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-	p.log.Debug("Received send request: ", *request)
+	p.log.Debug("Received send request: ", *request.msg)
 
 	msg := request.msg
 
 	// read payload from message
-	payload := msg.Payload
+	uncompressedPayload := msg.Payload
 
 	var schemaPayload []byte
 	var err error
@@ -486,9 +483,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		return
 	}
 
+	// The block chan must be closed when returned with exception
+	defer request.stopBlock()
+	if !p.canAddToQueue(request) {
+		return
+	}
+
 	if p.options.DisableMultiSchema {
 		if msg.Schema != nil && p.options.Schema != nil &&
 			msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
+			p.publishSemaphore.Release()
 			p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
 			return
 		}
@@ -503,7 +507,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	if msg.Value != nil {
 		// payload and schema are mutually exclusive
 		// try to get payload from schema value only if payload is not set
-		if payload == nil && schema != nil {
+		if uncompressedPayload == nil && schema != nil {
 			schemaPayload, err = schema.Encode(msg.Value)
 			if err != nil {
 				p.publishSemaphore.Release()
@@ -513,14 +517,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 			}
 		}
 	}
-	if payload == nil {
-		payload = schemaPayload
+	if uncompressedPayload == nil {
+		uncompressedPayload = schemaPayload
 	}
+
 	if schema != nil {
 		schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
 		if schemaVersion == nil {
 			schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
 			if err != nil {
+				p.publishSemaphore.Release()
 				p.log.WithError(err).Error("get schema version fail")
 				return
 			}
@@ -528,29 +534,196 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		}
 	}
 
-	// if msg is too large
-	if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+	uncompressedSize := len(uncompressedPayload)
+
+	deliverAt := msg.DeliverAt
+	if msg.DeliverAfter.Nanoseconds() > 0 {
+		deliverAt = time.Now().Add(msg.DeliverAfter)
+	}
+
+	mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+	// set default ReplicationClusters when DisableReplication
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
+	sendAsBatch := !p.options.DisableBatching &&
+		msg.ReplicationClusters == nil &&
+		deliverAt.UnixNano() < 0
+
+	// Once the batching is enabled, it can close blockCh early to make block finish
+	if sendAsBatch {
+		request.stopBlock()
+	} else {
+		// update sequence id for metadata, make the size of msgMetadata more accurate
+		// batch sending will update sequence ID in the BatchBuilder
+		p.updateMetadataSeqID(mm, msg)
+	}
+
+	maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+	// compress payload if not batching
+	var compressedPayload []byte
+	var compressedSize int
+	var checkSize int
+	if !sendAsBatch {
+		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
+		compressedSize = len(compressedPayload)
+		checkSize = compressedSize
+	} else {
+		// final check for batching message is in serializeMessage
+		// this is a double check
+		checkSize = uncompressedSize
+	}
+
+	// if msg is too large and chunking is disabled
+	if checkSize > maxMessageSize && !p.options.EnableChunking {
 		p.publishSemaphore.Release()
 		request.callback(nil, request.msg, errMessageTooLarge)
 		p.log.WithError(errMessageTooLarge).
-			WithField("size", len(payload)).
+			WithField("size", checkSize).
 			WithField("properties", msg.Properties).
-			Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			Errorf("MaxMessageSize %d", maxMessageSize)
 		p.metrics.PublishErrorsMsgTooLarge.Inc()
 		return
 	}
 
-	deliverAt := msg.DeliverAt
-	if msg.DeliverAfter.Nanoseconds() > 0 {
-		deliverAt = time.Now().Add(msg.DeliverAfter)
+	var totalChunks int
+	// max chunk payload size
+	var payloadChunkSize int
+	if sendAsBatch || !p.options.EnableChunking {
+		totalChunks = 1
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+	} else {
+		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - mm.Size()
+		if payloadChunkSize <= 0 {
+			p.publishSemaphore.Release()
+			request.callback(nil, msg, errMetaTooLarge)
+			p.log.WithError(errMetaTooLarge).
+				WithField("metadata size", mm.Size()).
+				WithField("properties", msg.Properties).
+				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
+		// set ChunkMaxMessageSize
+		if p.options.ChunkMaxMessageSize != 0 {
+			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+		}
+		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
 	}
 
-	sendAsBatch := !p.options.DisableBatching &&
-		msg.ReplicationClusters == nil &&
-		deliverAt.UnixNano() < 0
+	// set total chunks to send request
+	request.totalChunks = totalChunks
+
+	if !sendAsBatch {
+		if totalChunks > 1 {
+			var lhs, rhs int
+			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
+			mm.Uuid = proto.String(uuid)
+			mm.NumChunksFromMsg = proto.Int(totalChunks)
+			mm.TotalChunkMsgSize = proto.Int(compressedSize)
+			cr := newChunkRecorder()
+			for chunkID := 0; chunkID < totalChunks; chunkID++ {
+				lhs = chunkID * payloadChunkSize
+				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
+					rhs = compressedSize
+				}
+				// update chunk id
+				mm.ChunkId = proto.Int(chunkID)
+				nsr := &sendRequest{
+					ctx:              request.ctx,
+					msg:              request.msg,
+					callback:         request.callback,
+					callbackOnce:     request.callbackOnce,
+					publishTime:      request.publishTime,
+					blockCh:          request.blockCh,
+					closeBlockChOnce: request.closeBlockChOnce,
+					totalChunks:      totalChunks,
+					chunkID:          chunkID,
+					uuid:             uuid,
+					chunkRecorder:    cr,
+				}
+				// the permit of first chunk has acquired
+				if chunkID != 0 && !p.canAddToQueue(nsr) {
+					return
+				}
+				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+			}
+			// close the blockCh when all the chunks acquired permits
+			request.stopBlock()
+		} else {
+			// close the blockCh when totalChunks is 1 (it has acquired permits)
+			request.stopBlock()
+			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
+		}
+	} else {
+		smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
+		multiSchemaEnabled := !p.options.DisableMultiSchema
+		added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
+		if !added {
+			// The current batch is full.. flush it and retry
+
+			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
+				msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
+				p.publishSemaphore.Release()
+				request.callback(nil, request.msg, errFailAddToBatch)
+				p.log.WithField("size", uncompressedSize).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+				return
+			}
+		}
+		if request.flushImmediately {
 
-	smm := &pb.SingleMessageMetadata{
-		PayloadSize: proto.Int(len(payload)),
+			p.internalFlushCurrentBatch()
+
+		}
+	}
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage,
+	uncompressedSize int,
+	deliverAt time.Time) (mm *pb.MessageMetadata) {
+	mm = &pb.MessageMetadata{
+		ProducerName:     &p.producerName,
+		PublishTime:      proto.Uint64(internal.TimestampMillis(time.Now())),
+		ReplicateTo:      msg.ReplicationClusters,
+		UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+	}
+
+	if msg.Key != "" {
+		mm.PartitionKey = proto.String(msg.Key)
+	}
+
+	if msg.Properties != nil {
+		mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+	}
+
+	if deliverAt.UnixNano() > 0 {
+		mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+	}
+
+	return
+}
+
+func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *ProducerMessage) {
+	if msg.SequenceID != nil {
+		mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+	} else {
+		mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+	}
+}
+
+func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage,
+	uncompressedSize int) (smm *pb.SingleMessageMetadata) {
+	smm = &pb.SingleMessageMetadata{
+		PayloadSize: proto.Int(uncompressedSize),
 	}
 
 	if !msg.EventTime.IsZero() {
@@ -569,49 +742,61 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		smm.Properties = internal.ConvertFromStringMap(msg.Properties)
 	}
 
+	var sequenceID uint64
 	if msg.SequenceID != nil {
-		sequenceID := uint64(*msg.SequenceID)
-		smm.SequenceId = proto.Uint64(sequenceID)
+		sequenceID = uint64(*msg.SequenceID)
+	} else {
+		sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
 	}
 
-	if !sendAsBatch {
-		p.internalFlushCurrentBatch()
-	}
+	smm.SequenceId = proto.Uint64(sequenceID)
 
-	if msg.DisableReplication {
-		msg.ReplicationClusters = []string{"__local__"}
-	}
-	multiSchemaEnabled := !p.options.DisableMultiSchema
-	added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-		msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled)
-	if !added {
-		// The current batch is full.. flush it and retry
+	return
+}
 
-		p.internalFlushCurrentBatch()
+func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+	compressedPayload []byte,
+	request *sendRequest,
+	maxMessageSize uint32) {
+	msg := request.msg
 
-		// after flushing try again to add the current payload
-		if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
-			msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled); !ok {
-			p.publishSemaphore.Release()
-			request.callback(nil, request.msg, errFailAddToBatch)
-			p.log.WithField("size", len(payload)).
-				WithField("properties", msg.Properties).
-				Error("unable to add message to batch")
-			return
-		}
-	}
+	payloadBuf := internal.NewBuffer(len(compressedPayload))
+	payloadBuf.Write(compressedPayload)
 
-	if !sendAsBatch || request.flushImmediately {
+	buffer := p.GetBuffer()
+	if buffer == nil {
+		buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
+	}
 
-		p.internalFlushCurrentBatch()
+	sid := *mm.SequenceId
 
+	if err := internal.SingleSend(
+		buffer,
+		p.producerID,
+		sid,
+		mm,
+		payloadBuf,
+		p.encryptor,
+		maxMessageSize,
+	); err != nil {
+		request.callback(nil, request.msg, err)
+		p.publishSemaphore.Release()
+		p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
+		return
 	}
 
+	p.pendingQueue.Put(&pendingItem{
+		sentAt:       time.Now(),
+		buffer:       buffer,
+		sequenceID:   sid,
+		sendRequests: []interface{}{request},
+	})
+	p._getConn().WriteData(buffer)
 }
 
 type pendingItem struct {
 	sync.Mutex
-	batchData    internal.Buffer
+	buffer       internal.Buffer
 	sequenceID   uint64
 	sentAt       time.Time
 	sendRequests []interface{}
@@ -637,12 +822,18 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 				sr.callback(nil, sr.msg, err)
 			}
 		}
+		if errors.Is(err, internal.ErrExceedMaxMessageSize) {
+			p.log.WithError(errMessageTooLarge).
+				Errorf("internal err: %s", err)
+			p.metrics.PublishErrorsMsgTooLarge.Inc()
+			return
+		}
 		return
 	}
 
 	p.pendingQueue.Put(&pendingItem{
 		sentAt:       time.Now(),
-		batchData:    batchData,
+		buffer:       batchData,
 		sequenceID:   sequenceID,
 		sendRequests: callbacks,
 	})
@@ -735,8 +926,11 @@ func (p *partitionProducer) failTimeoutMessages() {
 						WithField("size", size).
 						WithField("properties", sr.msg.Properties)
 				}
+
 				if sr.callback != nil {
-					sr.callback(nil, sr.msg, errSendTimeout)
+					sr.callbackOnce.Do(func() {
+						sr.callback(nil, sr.msg, errSendTimeout)
+					})
 				}
 			}
 
@@ -754,7 +948,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-	batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
+	batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches()
 	if batchesData == nil {
 		return
 	}
@@ -762,12 +956,18 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
 	for i := range batchesData {
 		// error occurred in processing batch
 		// report it using callback
-		if errors[i] != nil {
+		if errs[i] != nil {
 			for _, cb := range callbacks[i] {
 				if sr, ok := cb.(*sendRequest); ok {
-					sr.callback(nil, sr.msg, errors[i])
+					sr.callback(nil, sr.msg, errs[i])
 				}
 			}
+			if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
+				p.log.WithError(errMessageTooLarge).
+					Errorf("internal err: %s", errs[i])
+				p.metrics.PublishErrorsMsgTooLarge.Inc()
+				return
+			}
 			continue
 		}
 		if batchesData[i] == nil {
@@ -775,7 +975,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
 		}
 		p.pendingQueue.Put(&pendingItem{
 			sentAt:       time.Now(),
-			batchData:    batchesData[i],
+			buffer:       batchesData[i],
 			sequenceID:   sequenceIDs[i],
 			sendRequests: callbacks[i],
 		})
@@ -836,6 +1036,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
 
 	// wait for send request to finish
 	<-doneCh
+
 	return msgID, err
 }
 
@@ -852,33 +1053,30 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
 		return
 	}
 
+	// bc only works when DisableBlockIfQueueFull is false
+	bc := make(chan struct{})
+
+	// callbackOnce make sure the callback is only invoked once in chunking
+	callbackOnce := &sync.Once{}
+
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
 		callback:         callback,
+		callbackOnce:     callbackOnce,
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
+		blockCh:          bc,
+		closeBlockChOnce: &sync.Once{},
 	}
 	p.options.Interceptors.BeforeSend(p, msg)
 
-	if p.options.DisableBlockIfQueueFull {
-		if !p.publishSemaphore.TryAcquire() {
-			if callback != nil {
-				callback(nil, msg, errSendQueueIsFull)
-			}
-			return
-		}
-	} else {
-		if !p.publishSemaphore.Acquire(ctx) {
-			callback(nil, msg, errContextExpired)
-			return
-		}
-	}
-
-	p.metrics.MessagesPending.Inc()
-	p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
-
 	p.eventsChan <- sr
+
+	if !p.options.DisableBlockIfQueueFull {
+		// block if queue full
+		<-bc
+	}
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -933,11 +1131,34 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 					p.partitionIdx,
 				)
 
-				if sr.callback != nil {
-					sr.callback(msgID, sr.msg, nil)
+				if sr.totalChunks > 1 {
+					if sr.chunkID == 0 {
+						sr.chunkRecorder.setFirstChunkID(
+							messageID{
+								int64(response.MessageId.GetLedgerId()),
+								int64(response.MessageId.GetEntryId()),
+								-1,
+								p.partitionIdx,
+							})
+					} else if sr.chunkID == sr.totalChunks-1 {
+						sr.chunkRecorder.setLastChunkID(
+							messageID{
+								int64(response.MessageId.GetLedgerId()),
+								int64(response.MessageId.GetEntryId()),
+								-1,
+								p.partitionIdx,
+							})
+						// use chunkMsgID to set msgID
+						msgID = sr.chunkRecorder.chunkedMsgID
+					}
 				}
 
-				p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+				if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+					if sr.callback != nil {
+						sr.callback(msgID, sr.msg, nil)
+					}
+					p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+				}
 			}
 		}
 
@@ -966,8 +1187,10 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
 		p.log.Info("Closed producer")
 	}
 
-	if err = p.batchBuilder.Close(); err != nil {
-		p.log.WithError(err).Warn("Failed to close batch builder")
+	if p.batchBuilder != nil {
+		if err = p.batchBuilder.Close(); err != nil {
+			p.log.WithError(err).Warn("Failed to close batch builder")
+		}
 	}
 
 	p.setProducerState(producerClosed)
@@ -1024,8 +1247,22 @@ type sendRequest struct {
 	ctx              context.Context
 	msg              *ProducerMessage
 	callback         func(MessageID, *ProducerMessage, error)
+	callbackOnce     *sync.Once
 	publishTime      time.Time
 	flushImmediately bool
+	blockCh          chan struct{}
+	closeBlockChOnce *sync.Once
+	totalChunks      int
+	chunkID          int
+	uuid             string
+	chunkRecorder    *chunkRecorder
+}
+
+// stopBlock can be invoked multiple times safety
+func (sr *sendRequest) stopBlock() {
+	sr.closeBlockChOnce.Do(func() {
+		close(sr.blockCh)
+	})
 }
 
 type closeProducer struct {
@@ -1042,7 +1279,7 @@ func (i *pendingItem) Complete() {
 		return
 	}
 	i.completed = true
-	buffersPool.Put(i.batchData)
+	buffersPool.Put(i.buffer)
 }
 
 // _setConn sets the internal connection field of this partition producer atomically.
@@ -1059,3 +1296,38 @@ func (p *partitionProducer) _getConn() internal.Connection {
 	//            invariant is broken
 	return p.conn.Load().(internal.Connection)
 }
+
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
+	if p.options.DisableBlockIfQueueFull {
+		if !p.publishSemaphore.TryAcquire() {
+			if sr.callback != nil {
+				sr.callback(nil, sr.msg, errSendQueueIsFull)
+			}
+			return false
+		}
+	} else if !p.publishSemaphore.Acquire(sr.ctx) {
+		sr.callback(nil, sr.msg, errContextExpired)
+		return false
+	}
+	p.metrics.MessagesPending.Inc()
+	p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
+	return true
+}
+
+type chunkRecorder struct {
+	chunkedMsgID chunkMessageID
+}
+
+func newChunkRecorder() *chunkRecorder {
+	return &chunkRecorder{
+		chunkedMsgID: chunkMessageID{},
+	}
+}
+
+func (c *chunkRecorder) setFirstChunkID(msgID messageID) {
+	c.chunkedMsgID.firstChunkID = msgID
+}
+
+func (c *chunkRecorder) setLastChunkID(msgID messageID) {
+	c.chunkedMsgID.messageID = msgID
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index dc13f50..f193ffd 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"net/http"
 	"strconv"
@@ -934,14 +935,45 @@ func TestMaxMessageSize(t *testing.T) {
 	assert.NotNil(t, producer)
 	defer producer.Close()
 
+	// producer2 disable batching
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: true,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	// When serverMaxMessageSize=1024, the batch payload=1041
+	// The totalSize includes:
+	// | singleMsgMetadataLength | singleMsgMetadata | payload |
+	// | ----------------------- | ----------------- | ------- |
+	// | 4                       | 13                | 1024    |
+	// So when bias <= 0, the uncompressed payload will not exceed maxMessageSize,
+	// but encryptedPayloadSize exceeds maxMessageSize, Send() will return an internal error.
+	// When bias = 1, the first check of maxMessageSize (for uncompressed payload) is valid,
+	// Send() will return errMessageTooLarge
 	for bias := -1; bias <= 1; bias++ {
 		payload := make([]byte, serverMaxMessageSize+bias)
 		ID, err := producer.Send(context.Background(), &ProducerMessage{
 			Payload: payload,
 		})
 		if bias <= 0 {
-			assert.NoError(t, err)
-			assert.NotNil(t, ID)
+			assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize))
+			assert.Nil(t, ID)
+		} else {
+			assert.Equal(t, errMessageTooLarge, err)
+		}
+	}
+
+	for bias := -1; bias <= 1; bias++ {
+		payload := make([]byte, serverMaxMessageSize+bias)
+		ID, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: payload,
+		})
+		if bias <= 0 {
+			assert.Equal(t, true, errors.Is(err, internal.ErrExceedMaxMessageSize))
+			assert.Nil(t, ID)
 		} else {
 			assert.Equal(t, errMessageTooLarge, err)
 		}