You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/10/25 11:44:40 UTC

[pulsar-client-go] branch master updated: fix: normalize all send request resource release into sr.done (#1121)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c14fb7a fix: normalize all send request resource release into sr.done (#1121)
7c14fb7a is described below

commit 7c14fb7a8e7c783db9d5738c3b2aa65d56f2db82
Author: tison <wa...@gmail.com>
AuthorDate: Wed Oct 25 19:44:33 2023 +0800

    fix: normalize all send request resource release into sr.done (#1121)
    
    Signed-off-by: tison <wa...@gmail.com>
    Co-authored-by: gunli <gu...@tencent.com>
---
 pulsar/message_chunking_test.go |  15 +-
 pulsar/producer_partition.go    | 419 +++++++++++++++++++++-------------------
 pulsar/producer_test.go         |   6 +-
 pulsar/transaction_test.go      |   1 -
 4 files changed, 228 insertions(+), 213 deletions(-)

diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index fbdcaa0c..59fdb5ec 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -24,6 +24,7 @@ import (
 	"math/rand"
 	"net/http"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -531,12 +532,13 @@ func TestChunkBlockIfQueueFull(t *testing.T) {
 	assert.NotNil(t, producer)
 	defer producer.Close()
 
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
 	// Large messages will be split into 11 chunks, exceeding the length of pending queue
-	ID, err := producer.Send(context.Background(), &ProducerMessage{
+	_, err = producer.Send(ctx, &ProducerMessage{
 		Payload: createTestMessagePayload(100),
 	})
-	assert.NoError(t, err)
-	assert.NotNil(t, ID)
+	assert.Error(t, err)
 }
 
 func createTestMessagePayload(size int) []byte {
@@ -566,18 +568,15 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
 		&sendRequest{
 			callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
 			},
+			callbackOnce:        &sync.Once{},
 			ctx:                 context.Background(),
 			msg:                 msg,
+			producer:            producerImpl,
 			flushImmediately:    true,
 			totalChunks:         totalChunks,
 			chunkID:             chunkID,
 			uuid:                uuid,
 			chunkRecorder:       newChunkRecorder(),
-			transaction:         nil,
-			reservedMem:         0,
-			sendAsBatch:         false,
-			schema:              nil,
-			schemaVersion:       nil,
 			uncompressedPayload: wholePayload,
 			uncompressedSize:    int64(len(wholePayload)),
 			compressedPayload:   wholePayload,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b00ed6b5..f606fe05 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -251,7 +251,7 @@ func (p *partitionProducer) grabCnx() error {
 	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
 	if err != nil {
 		p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
-		if err == internal.ErrRequestTimeOut {
+		if errors.Is(err, internal.ErrRequestTimeOut) {
 			id := p.client.rpcClient.NewRequestID()
 			_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
 				&pb.CommandCloseProducer{
@@ -481,45 +481,28 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg
 func (p *partitionProducer) internalSend(sr *sendRequest) {
 	p.log.Debug("Received send request: ", *sr.msg)
 
-	msg := sr.msg
-
-	if !p.canAddToQueue(sr) {
-		return
-	}
-
-	// try to reserve memory for uncompressedPayload
-	if !p.canReserveMem(sr, sr.uncompressedSize) {
-		return
-	}
-
-	if err := p.updateChunkInfo(sr); err != nil {
-		p.releaseSemaphoreAndMem(sr.uncompressedSize)
-		runCallback(sr.callback, nil, sr.msg, err)
-		p.metrics.PublishErrorsMsgTooLarge.Inc()
-		return
-	}
-
 	if sr.sendAsBatch {
-		smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize))
+		smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
 		multiSchemaEnabled := !p.options.DisableMultiSchema
-		added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
-			multiSchemaEnabled)
+
+		added := addRequestToBatch(
+			smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
 		if !added {
 			// The current batch is full. flush it and retry
-
 			p.internalFlushCurrentBatch()
 
 			// after flushing try again to add the current payload
-			if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
-				multiSchemaEnabled); !ok {
-				p.releaseSemaphoreAndMem(sr.uncompressedSize)
-				runCallback(sr.callback, nil, sr.msg, errFailAddToBatch)
+			ok := addRequestToBatch(
+				smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
+			if !ok {
 				p.log.WithField("size", sr.uncompressedSize).
-					WithField("properties", msg.Properties).
+					WithField("properties", sr.msg.Properties).
 					Error("unable to add message to batch")
+				sr.done(nil, errFailAddToBatch)
 				return
 			}
 		}
+
 		if sr.flushImmediately {
 			p.internalFlushCurrentBatch()
 		}
@@ -547,6 +530,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
 		nsr := &sendRequest{
 			ctx:                 sr.ctx,
 			msg:                 sr.msg,
+			producer:            sr.producer,
 			callback:            sr.callback,
 			callbackOnce:        sr.callbackOnce,
 			publishTime:         sr.publishTime,
@@ -556,6 +540,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
 			uuid:                uuid,
 			chunkRecorder:       cr,
 			transaction:         sr.transaction,
+			memLimit:            sr.memLimit,
+			semaphore:           sr.semaphore,
 			reservedMem:         int64(rhs - lhs),
 			sendAsBatch:         sr.sendAsBatch,
 			schema:              sr.schema,
@@ -569,12 +555,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
 			deliverAt:           sr.deliverAt,
 			maxMessageSize:      sr.maxMessageSize,
 		}
-		// the permit of first chunk has acquired
-		if chunkID != 0 && !p.canAddToQueue(nsr) {
-			p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs))
-			return
-		}
-		p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
+
+		p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize))
 	}
 }
 
@@ -675,11 +657,13 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(
 	return
 }
 
-func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+func (p *partitionProducer) internalSingleSend(
+	mm *pb.MessageMetadata,
 	compressedPayload []byte,
-	request *sendRequest,
-	maxMessageSize uint32) {
-	msg := request.msg
+	sr *sendRequest,
+	maxMessageSize uint32,
+) {
+	msg := sr.msg
 
 	payloadBuf := internal.NewBuffer(len(compressedPayload))
 	payloadBuf.Write(compressedPayload)
@@ -694,8 +678,8 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
 	var mostSigBits uint64
 	var leastSigBits uint64
 
-	if request.transaction != nil {
-		txnID := request.transaction.GetTxnID()
+	if sr.transaction != nil {
+		txnID := sr.transaction.GetTxnID()
 		useTxn = true
 		mostSigBits = txnID.MostSigBits
 		leastSigBits = txnID.LeastSigBits
@@ -715,8 +699,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
 	)
 
 	if err != nil {
-		runCallback(request.callback, nil, request.msg, err)
-		p.releaseSemaphoreAndMem(request.reservedMem)
+		sr.done(nil, err)
 		p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
 		return
 	}
@@ -725,7 +708,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
 		sentAt:       time.Now(),
 		buffer:       buffer,
 		sequenceID:   sid,
-		sendRequests: []interface{}{request},
+		sendRequests: []interface{}{sr},
 	})
 	p._getConn().WriteData(buffer)
 }
@@ -756,15 +739,14 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 	if err != nil {
 		for _, cb := range callbacks {
 			if sr, ok := cb.(*sendRequest); ok {
-				runCallback(sr.callback, nil, sr.msg, err)
+				sr.done(nil, err)
 			}
 		}
+
 		if errors.Is(err, internal.ErrExceedMaxMessageSize) {
-			p.log.WithError(errMessageTooLarge).
-				Errorf("internal err: %s", err)
-			p.metrics.PublishErrorsMsgTooLarge.Inc()
-			return
+			p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err)
 		}
+
 		return
 	}
 
@@ -853,25 +835,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 
 			for _, i := range pi.sendRequests {
 				sr := i.(*sendRequest)
-				if sr.msg != nil {
-					size := len(sr.msg.Payload)
-					p.releaseSemaphoreAndMem(sr.reservedMem)
-					p.metrics.MessagesPending.Dec()
-					p.metrics.BytesPending.Sub(float64(size))
-					p.metrics.PublishErrorsTimeout.Inc()
-					p.log.WithError(errSendTimeout).
-						WithField("size", size).
-						WithField("properties", sr.msg.Properties)
-				}
-
-				if sr.callback != nil {
-					sr.callbackOnce.Do(func() {
-						runCallback(sr.callback, nil, sr.msg, errSendTimeout)
-					})
-				}
-				if sr.transaction != nil {
-					sr.transaction.endSendOrAckOp(nil)
-				}
+				sr.done(nil, errSendTimeout)
 			}
 
 			// flag the sending has completed with error, flush make no effect
@@ -899,15 +863,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
 		if errs[i] != nil {
 			for _, cb := range callbacks[i] {
 				if sr, ok := cb.(*sendRequest); ok {
-					runCallback(sr.callback, nil, sr.msg, errs[i])
+					sr.done(nil, errs[i])
 				}
 			}
+
 			if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
-				p.log.WithError(errMessageTooLarge).
-					Errorf("internal err: %s", errs[i])
-				p.metrics.PublishErrorsMsgTooLarge.Inc()
+				p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
 				return
 			}
+
 			continue
 		}
 		if batchesData[i] == nil {
@@ -1036,12 +1000,6 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
 	}
 
 	sr.transaction = txn
-	callback := sr.callback
-	sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) {
-		runCallback(callback, id, producerMessage, err)
-		txn.endSendOrAckOp(err)
-	}
-
 	return nil
 }
 
@@ -1188,19 +1146,21 @@ func (p *partitionProducer) internalSendAsync(
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
+		producer:         p,
 		callback:         callback,
 		callbackOnce:     &sync.Once{},
 		flushImmediately: flushImmediately,
 		publishTime:      time.Now(),
+		chunkID:          -1,
 	}
+
 	if err := p.prepareTransaction(sr); err != nil {
-		runCallback(sr.callback, nil, msg, err)
+		sr.done(nil, err)
 		return
 	}
 
 	if p.getProducerState() != producerReady {
-		// Producer is closing
-		runCallback(sr.callback, nil, msg, errProducerClosed)
+		sr.done(nil, errProducerClosed)
 		return
 	}
 
@@ -1208,18 +1168,30 @@ func (p *partitionProducer) internalSendAsync(
 
 	if err := p.updateSchema(sr); err != nil {
 		p.log.Error(err)
-		runCallback(sr.callback, nil, msg, err)
+		sr.done(nil, err)
 		return
 	}
 
 	if err := p.updateUncompressedPayload(sr); err != nil {
 		p.log.Error(err)
-		runCallback(sr.callback, nil, msg, err)
+		sr.done(nil, err)
 		return
 	}
 
 	p.updateMetaData(sr)
 
+	if err := p.updateChunkInfo(sr); err != nil {
+		p.log.Error(err)
+		sr.done(nil, err)
+		return
+	}
+
+	if err := p.reserveResources(sr); err != nil {
+		p.log.Error(err)
+		sr.done(nil, err)
+		return
+	}
+
 	p.dataChan <- sr
 }
 
@@ -1257,55 +1229,40 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 		for idx, i := range pi.sendRequests {
 			sr := i.(*sendRequest)
 			atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
-			p.releaseSemaphoreAndMem(sr.reservedMem)
-			p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
-			p.metrics.MessagesPublished.Inc()
-			p.metrics.MessagesPending.Dec()
-			payloadSize := float64(len(sr.msg.Payload))
-			p.metrics.BytesPublished.Add(payloadSize)
-			p.metrics.BytesPending.Sub(payloadSize)
-
-			if sr.callback != nil || len(p.options.Interceptors) > 0 {
-				msgID := newMessageID(
-					int64(response.MessageId.GetLedgerId()),
-					int64(response.MessageId.GetEntryId()),
-					int32(idx),
-					p.partitionIdx,
-					batchSize,
-				)
-
-				if sr.totalChunks > 1 {
-					if sr.chunkID == 0 {
-						sr.chunkRecorder.setFirstChunkID(
-							&messageID{
-								int64(response.MessageId.GetLedgerId()),
-								int64(response.MessageId.GetEntryId()),
-								-1,
-								p.partitionIdx,
-								0,
-							})
-					} else if sr.chunkID == sr.totalChunks-1 {
-						sr.chunkRecorder.setLastChunkID(
-							&messageID{
-								int64(response.MessageId.GetLedgerId()),
-								int64(response.MessageId.GetEntryId()),
-								-1,
-								p.partitionIdx,
-								0,
-							})
-						// use chunkMsgID to set msgID
-						msgID = &sr.chunkRecorder.chunkedMsgID
-					}
-				}
 
-				if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
-					runCallback(sr.callback, msgID, sr.msg, nil)
-					p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+			msgID := newMessageID(
+				int64(response.MessageId.GetLedgerId()),
+				int64(response.MessageId.GetEntryId()),
+				int32(idx),
+				p.partitionIdx,
+				batchSize,
+			)
+
+			if sr.totalChunks > 1 {
+				if sr.chunkID == 0 {
+					sr.chunkRecorder.setFirstChunkID(
+						&messageID{
+							int64(response.MessageId.GetLedgerId()),
+							int64(response.MessageId.GetEntryId()),
+							-1,
+							p.partitionIdx,
+							0,
+						})
+				} else if sr.chunkID == sr.totalChunks-1 {
+					sr.chunkRecorder.setLastChunkID(
+						&messageID{
+							int64(response.MessageId.GetLedgerId()),
+							int64(response.MessageId.GetEntryId()),
+							-1,
+							p.partitionIdx,
+							0,
+						})
+					// use chunkMsgID to set msgID
+					msgID = &sr.chunkRecorder.chunkedMsgID
 				}
 			}
-			if sr.transaction != nil {
-				sr.transaction.endSendOrAckOp(nil)
-			}
+
+			sr.done(msgID, nil)
 		}
 
 		// Mark this pending item as done
@@ -1372,24 +1329,7 @@ func (p *partitionProducer) failPendingMessages() {
 
 		for _, i := range pi.sendRequests {
 			sr := i.(*sendRequest)
-			if sr.msg != nil {
-				size := len(sr.msg.Payload)
-				p.releaseSemaphoreAndMem(sr.reservedMem)
-				p.metrics.MessagesPending.Dec()
-				p.metrics.BytesPending.Sub(float64(size))
-				p.log.WithError(errProducerClosed).
-					WithField("size", size).
-					WithField("properties", sr.msg.Properties)
-			}
-
-			if sr.callback != nil {
-				sr.callbackOnce.Do(func() {
-					runCallback(sr.callback, nil, sr.msg, errProducerClosed)
-				})
-			}
-			if sr.transaction != nil {
-				sr.transaction.endSendOrAckOp(nil)
-			}
+			sr.done(nil, errProducerClosed)
 		}
 
 		// flag the sending has completed with error, flush make no effect
@@ -1448,19 +1388,29 @@ func (p *partitionProducer) Close() {
 }
 
 type sendRequest struct {
-	ctx                 context.Context
-	msg                 *ProducerMessage
-	callback            func(MessageID, *ProducerMessage, error)
-	callbackOnce        *sync.Once
-	publishTime         time.Time
-	flushImmediately    bool
-	totalChunks         int
-	chunkID             int
-	uuid                string
-	chunkRecorder       *chunkRecorder
-	transaction         *transaction
-	reservedMem         int64
+	ctx              context.Context
+	msg              *ProducerMessage
+	producer         *partitionProducer
+	callback         func(MessageID, *ProducerMessage, error)
+	callbackOnce     *sync.Once
+	publishTime      time.Time
+	flushImmediately bool
+	totalChunks      int
+	chunkID          int
+	uuid             string
+	chunkRecorder    *chunkRecorder
+
+	/// resource management
+
+	memLimit          internal.MemoryLimitController
+	reservedMem       int64
+	semaphore         internal.Semaphore
+	reservedSemaphore int
+
+	/// convey settable state
+
 	sendAsBatch         bool
+	transaction         *transaction
 	schema              Schema
 	schemaVersion       []byte
 	uncompressedPayload []byte
@@ -1473,6 +1423,116 @@ type sendRequest struct {
 	maxMessageSize      int32
 }
 
+func (sr *sendRequest) done(msgID MessageID, err error) {
+	if err == nil {
+		sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
+		sr.producer.metrics.MessagesPublished.Inc()
+		sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
+
+		if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+			if sr.producer.options.Interceptors != nil {
+				sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
+			}
+		}
+	}
+
+	if err != nil {
+		sr.producer.log.WithError(err).
+			WithField("size", sr.reservedMem).
+			WithField("properties", sr.msg.Properties)
+	}
+
+	if errors.Is(err, errSendTimeout) {
+		sr.producer.metrics.PublishErrorsTimeout.Inc()
+	}
+
+	if errors.Is(err, errMessageTooLarge) {
+		sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
+	}
+
+	if sr.semaphore != nil {
+		sr.semaphore.Release()
+		sr.producer.metrics.MessagesPending.Dec()
+	}
+
+	if sr.memLimit != nil {
+		sr.memLimit.ReleaseMemory(sr.reservedMem)
+		sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
+	}
+
+	// sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately
+	if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 {
+		sr.callbackOnce.Do(func() {
+			runCallback(sr.callback, msgID, sr.msg, err)
+		})
+
+		if sr.transaction != nil {
+			sr.transaction.endSendOrAckOp(err)
+		}
+	}
+}
+
+func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
+	for i := 0; i < sr.totalChunks; i++ {
+		if p.options.DisableBlockIfQueueFull {
+			if !p.publishSemaphore.TryAcquire() {
+				return errSendQueueIsFull
+			}
+
+			// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
+			// of that only a part of the chunks acquire succeed
+			sr.semaphore = p.publishSemaphore
+			sr.reservedSemaphore++
+			p.metrics.MessagesPending.Inc()
+		} else {
+			if !p.publishSemaphore.Acquire(sr.ctx) {
+				return errContextExpired
+			}
+
+			// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
+			// of that only a part of the chunks acquire succeed
+			sr.semaphore = p.publishSemaphore
+			sr.reservedSemaphore++
+			p.metrics.MessagesPending.Inc()
+		}
+	}
+
+	return nil
+}
+
+func (p *partitionProducer) reserveMem(sr *sendRequest) error {
+	requiredMem := sr.uncompressedSize
+	if !sr.sendAsBatch {
+		requiredMem = int64(sr.compressedSize)
+	}
+
+	if p.options.DisableBlockIfQueueFull {
+		if !p.client.memLimit.TryReserveMemory(requiredMem) {
+			return errMemoryBufferIsFull
+		}
+
+	} else {
+		if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
+			return errContextExpired
+		}
+	}
+
+	sr.memLimit = p.client.memLimit
+	sr.reservedMem += requiredMem
+	p.metrics.BytesPending.Add(float64(requiredMem))
+	return nil
+}
+
+func (p *partitionProducer) reserveResources(sr *sendRequest) error {
+	if err := p.reserveSemaphore(sr); err != nil {
+		return err
+	}
+	if err := p.reserveMem(sr); err != nil {
+		return err
+	}
+	return nil
+}
+
 type closeProducer struct {
 	doneCh chan struct{}
 }
@@ -1502,53 +1562,12 @@ func (p *partitionProducer) _setConn(conn internal.Connection) {
 // _getConn returns internal connection field of this partition producer atomically.
 // Note: should only be called by this partition producer before attempting to use the connection
 func (p *partitionProducer) _getConn() internal.Connection {
-	// Invariant: The conn must be non-nil for the lifetime of the partitionProducer.
+	// Invariant: p.conn must be non-nil for the lifetime of the partitionProducer.
 	//            For this reason we leave this cast unchecked and panic() if the
 	//            invariant is broken
 	return p.conn.Load().(internal.Connection)
 }
 
-func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
-	p.publishSemaphore.Release()
-	p.client.memLimit.ReleaseMemory(size)
-}
-
-func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
-	if p.options.DisableBlockIfQueueFull {
-		if !p.publishSemaphore.TryAcquire() {
-			runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull)
-			return false
-		}
-	} else {
-		if !p.publishSemaphore.Acquire(sr.ctx) {
-			runCallback(sr.callback, nil, sr.msg, errContextExpired)
-			return false
-		}
-	}
-	p.metrics.MessagesPending.Inc()
-	return true
-}
-
-func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
-	if p.options.DisableBlockIfQueueFull {
-		if !p.client.memLimit.TryReserveMemory(size) {
-			p.publishSemaphore.Release()
-			runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull)
-			return false
-		}
-
-	} else {
-		if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
-			p.publishSemaphore.Release()
-			runCallback(sr.callback, nil, sr.msg, errContextExpired)
-			return false
-		}
-	}
-	sr.reservedMem += size
-	p.metrics.BytesPending.Add(float64(size))
-	return true
-}
-
 type chunkRecorder struct {
 	chunkedMsgID chunkMessageID
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 29ffa780..49e225f3 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2079,7 +2079,6 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
 }
 
 func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
-
 	c, err := NewClient(ClientOptions{
 		URL:              serviceURL,
 		MemoryLimitBytes: 5 * 1024,
@@ -2136,12 +2135,11 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
 		SendTimeout:             2 * time.Second,
 	})
 
-	// producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk)
-	// because it reaches MaxPendingMessages in chunking
+	// producer3 cannot reserve 2*1024 bytes because it reaches MaxPendingMessages in chunking
 	_, _ = producer3.Send(context.Background(), &ProducerMessage{
 		Payload: make([]byte, 2*1024),
 	})
-	assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
+	assert.Zero(t, c.(*client).memLimit.CurrentUsage())
 }
 
 func TestMemLimitContextCancel(t *testing.T) {
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 385b197e..74e8dd0c 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) {
 	// Create transaction and register the send operation.
 	txn, err := client.NewTransaction(time.Hour)
 	require.Nil(t, err)
-	txn.(*transaction).registerSendOrAckOp()
 
 	// Create a producer with chunking enabled to send a large message that will be split into chunks.
 	producer, err := client.CreateProducer(ProducerOptions{