You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/10/25 11:44:40 UTC
[pulsar-client-go] branch master updated: fix: normalize all send request resource release into sr.done (#1121)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7c14fb7a fix: normalize all send request resource release into sr.done (#1121)
7c14fb7a is described below
commit 7c14fb7a8e7c783db9d5738c3b2aa65d56f2db82
Author: tison <wa...@gmail.com>
AuthorDate: Wed Oct 25 19:44:33 2023 +0800
fix: normalize all send request resource release into sr.done (#1121)
Signed-off-by: tison <wa...@gmail.com>
Co-authored-by: gunli <gu...@tencent.com>
---
pulsar/message_chunking_test.go | 15 +-
pulsar/producer_partition.go | 419 +++++++++++++++++++++-------------------
pulsar/producer_test.go | 6 +-
pulsar/transaction_test.go | 1 -
4 files changed, 228 insertions(+), 213 deletions(-)
diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index fbdcaa0c..59fdb5ec 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -24,6 +24,7 @@ import (
"math/rand"
"net/http"
"strings"
+ "sync"
"testing"
"time"
@@ -531,12 +532,13 @@ func TestChunkBlockIfQueueFull(t *testing.T) {
assert.NotNil(t, producer)
defer producer.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
// Large messages will be split into 11 chunks, exceeding the length of pending queue
- ID, err := producer.Send(context.Background(), &ProducerMessage{
+ _, err = producer.Send(ctx, &ProducerMessage{
Payload: createTestMessagePayload(100),
})
- assert.NoError(t, err)
- assert.NotNil(t, ID)
+ assert.Error(t, err)
}
func createTestMessagePayload(size int) []byte {
@@ -566,18 +568,15 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
&sendRequest{
callback: func(id MessageID, producerMessage *ProducerMessage, err error) {
},
+ callbackOnce: &sync.Once{},
ctx: context.Background(),
msg: msg,
+ producer: producerImpl,
flushImmediately: true,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: newChunkRecorder(),
- transaction: nil,
- reservedMem: 0,
- sendAsBatch: false,
- schema: nil,
- schemaVersion: nil,
uncompressedPayload: wholePayload,
uncompressedSize: int64(len(wholePayload)),
compressedPayload: wholePayload,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b00ed6b5..f606fe05 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -251,7 +251,7 @@ func (p *partitionProducer) grabCnx() error {
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
- if err == internal.ErrRequestTimeOut {
+ if errors.Is(err, internal.ErrRequestTimeOut) {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
@@ -481,45 +481,28 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg
func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.Debug("Received send request: ", *sr.msg)
- msg := sr.msg
-
- if !p.canAddToQueue(sr) {
- return
- }
-
- // try to reserve memory for uncompressedPayload
- if !p.canReserveMem(sr, sr.uncompressedSize) {
- return
- }
-
- if err := p.updateChunkInfo(sr); err != nil {
- p.releaseSemaphoreAndMem(sr.uncompressedSize)
- runCallback(sr.callback, nil, sr.msg, err)
- p.metrics.PublishErrorsMsgTooLarge.Inc()
- return
- }
-
if sr.sendAsBatch {
- smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize))
+ smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
- added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
- multiSchemaEnabled)
+
+ added := addRequestToBatch(
+ smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
-
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
- if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
- multiSchemaEnabled); !ok {
- p.releaseSemaphoreAndMem(sr.uncompressedSize)
- runCallback(sr.callback, nil, sr.msg, errFailAddToBatch)
+ ok := addRequestToBatch(
+ smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
+ if !ok {
p.log.WithField("size", sr.uncompressedSize).
- WithField("properties", msg.Properties).
+ WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
+ sr.done(nil, errFailAddToBatch)
return
}
}
+
if sr.flushImmediately {
p.internalFlushCurrentBatch()
}
@@ -547,6 +530,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
nsr := &sendRequest{
ctx: sr.ctx,
msg: sr.msg,
+ producer: sr.producer,
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
@@ -556,6 +540,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
uuid: uuid,
chunkRecorder: cr,
transaction: sr.transaction,
+ memLimit: sr.memLimit,
+ semaphore: sr.semaphore,
reservedMem: int64(rhs - lhs),
sendAsBatch: sr.sendAsBatch,
schema: sr.schema,
@@ -569,12 +555,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
deliverAt: sr.deliverAt,
maxMessageSize: sr.maxMessageSize,
}
- // the permit of first chunk has acquired
- if chunkID != 0 && !p.canAddToQueue(nsr) {
- p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs))
- return
- }
- p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
+
+ p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize))
}
}
@@ -675,11 +657,13 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(
return
}
-func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+func (p *partitionProducer) internalSingleSend(
+ mm *pb.MessageMetadata,
compressedPayload []byte,
- request *sendRequest,
- maxMessageSize uint32) {
- msg := request.msg
+ sr *sendRequest,
+ maxMessageSize uint32,
+) {
+ msg := sr.msg
payloadBuf := internal.NewBuffer(len(compressedPayload))
payloadBuf.Write(compressedPayload)
@@ -694,8 +678,8 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
var mostSigBits uint64
var leastSigBits uint64
- if request.transaction != nil {
- txnID := request.transaction.GetTxnID()
+ if sr.transaction != nil {
+ txnID := sr.transaction.GetTxnID()
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
@@ -715,8 +699,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
)
if err != nil {
- runCallback(request.callback, nil, request.msg, err)
- p.releaseSemaphoreAndMem(request.reservedMem)
+ sr.done(nil, err)
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
}
@@ -725,7 +708,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
sentAt: time.Now(),
buffer: buffer,
sequenceID: sid,
- sendRequests: []interface{}{request},
+ sendRequests: []interface{}{sr},
})
p._getConn().WriteData(buffer)
}
@@ -756,15 +739,14 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
if err != nil {
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
- runCallback(sr.callback, nil, sr.msg, err)
+ sr.done(nil, err)
}
}
+
if errors.Is(err, internal.ErrExceedMaxMessageSize) {
- p.log.WithError(errMessageTooLarge).
- Errorf("internal err: %s", err)
- p.metrics.PublishErrorsMsgTooLarge.Inc()
- return
+ p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err)
}
+
return
}
@@ -853,25 +835,7 @@ func (p *partitionProducer) failTimeoutMessages() {
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
- if sr.msg != nil {
- size := len(sr.msg.Payload)
- p.releaseSemaphoreAndMem(sr.reservedMem)
- p.metrics.MessagesPending.Dec()
- p.metrics.BytesPending.Sub(float64(size))
- p.metrics.PublishErrorsTimeout.Inc()
- p.log.WithError(errSendTimeout).
- WithField("size", size).
- WithField("properties", sr.msg.Properties)
- }
-
- if sr.callback != nil {
- sr.callbackOnce.Do(func() {
- runCallback(sr.callback, nil, sr.msg, errSendTimeout)
- })
- }
- if sr.transaction != nil {
- sr.transaction.endSendOrAckOp(nil)
- }
+ sr.done(nil, errSendTimeout)
}
// flag the sending has completed with error, flush make no effect
@@ -899,15 +863,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
if errs[i] != nil {
for _, cb := range callbacks[i] {
if sr, ok := cb.(*sendRequest); ok {
- runCallback(sr.callback, nil, sr.msg, errs[i])
+ sr.done(nil, errs[i])
}
}
+
if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
- p.log.WithError(errMessageTooLarge).
- Errorf("internal err: %s", errs[i])
- p.metrics.PublishErrorsMsgTooLarge.Inc()
+ p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}
+
continue
}
if batchesData[i] == nil {
@@ -1036,12 +1000,6 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
}
sr.transaction = txn
- callback := sr.callback
- sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) {
- runCallback(callback, id, producerMessage, err)
- txn.endSendOrAckOp(err)
- }
-
return nil
}
@@ -1188,19 +1146,21 @@ func (p *partitionProducer) internalSendAsync(
sr := &sendRequest{
ctx: ctx,
msg: msg,
+ producer: p,
callback: callback,
callbackOnce: &sync.Once{},
flushImmediately: flushImmediately,
publishTime: time.Now(),
+ chunkID: -1,
}
+
if err := p.prepareTransaction(sr); err != nil {
- runCallback(sr.callback, nil, msg, err)
+ sr.done(nil, err)
return
}
if p.getProducerState() != producerReady {
- // Producer is closing
- runCallback(sr.callback, nil, msg, errProducerClosed)
+ sr.done(nil, errProducerClosed)
return
}
@@ -1208,18 +1168,30 @@ func (p *partitionProducer) internalSendAsync(
if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
- runCallback(sr.callback, nil, msg, err)
+ sr.done(nil, err)
return
}
if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
- runCallback(sr.callback, nil, msg, err)
+ sr.done(nil, err)
return
}
p.updateMetaData(sr)
+ if err := p.updateChunkInfo(sr); err != nil {
+ p.log.Error(err)
+ sr.done(nil, err)
+ return
+ }
+
+ if err := p.reserveResources(sr); err != nil {
+ p.log.Error(err)
+ sr.done(nil, err)
+ return
+ }
+
p.dataChan <- sr
}
@@ -1257,55 +1229,40 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
- p.releaseSemaphoreAndMem(sr.reservedMem)
- p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
- p.metrics.MessagesPublished.Inc()
- p.metrics.MessagesPending.Dec()
- payloadSize := float64(len(sr.msg.Payload))
- p.metrics.BytesPublished.Add(payloadSize)
- p.metrics.BytesPending.Sub(payloadSize)
-
- if sr.callback != nil || len(p.options.Interceptors) > 0 {
- msgID := newMessageID(
- int64(response.MessageId.GetLedgerId()),
- int64(response.MessageId.GetEntryId()),
- int32(idx),
- p.partitionIdx,
- batchSize,
- )
-
- if sr.totalChunks > 1 {
- if sr.chunkID == 0 {
- sr.chunkRecorder.setFirstChunkID(
- &messageID{
- int64(response.MessageId.GetLedgerId()),
- int64(response.MessageId.GetEntryId()),
- -1,
- p.partitionIdx,
- 0,
- })
- } else if sr.chunkID == sr.totalChunks-1 {
- sr.chunkRecorder.setLastChunkID(
- &messageID{
- int64(response.MessageId.GetLedgerId()),
- int64(response.MessageId.GetEntryId()),
- -1,
- p.partitionIdx,
- 0,
- })
- // use chunkMsgID to set msgID
- msgID = &sr.chunkRecorder.chunkedMsgID
- }
- }
- if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
- runCallback(sr.callback, msgID, sr.msg, nil)
- p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+ msgID := newMessageID(
+ int64(response.MessageId.GetLedgerId()),
+ int64(response.MessageId.GetEntryId()),
+ int32(idx),
+ p.partitionIdx,
+ batchSize,
+ )
+
+ if sr.totalChunks > 1 {
+ if sr.chunkID == 0 {
+ sr.chunkRecorder.setFirstChunkID(
+ &messageID{
+ int64(response.MessageId.GetLedgerId()),
+ int64(response.MessageId.GetEntryId()),
+ -1,
+ p.partitionIdx,
+ 0,
+ })
+ } else if sr.chunkID == sr.totalChunks-1 {
+ sr.chunkRecorder.setLastChunkID(
+ &messageID{
+ int64(response.MessageId.GetLedgerId()),
+ int64(response.MessageId.GetEntryId()),
+ -1,
+ p.partitionIdx,
+ 0,
+ })
+ // use chunkMsgID to set msgID
+ msgID = &sr.chunkRecorder.chunkedMsgID
}
}
- if sr.transaction != nil {
- sr.transaction.endSendOrAckOp(nil)
- }
+
+ sr.done(msgID, nil)
}
// Mark this pending item as done
@@ -1372,24 +1329,7 @@ func (p *partitionProducer) failPendingMessages() {
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
- if sr.msg != nil {
- size := len(sr.msg.Payload)
- p.releaseSemaphoreAndMem(sr.reservedMem)
- p.metrics.MessagesPending.Dec()
- p.metrics.BytesPending.Sub(float64(size))
- p.log.WithError(errProducerClosed).
- WithField("size", size).
- WithField("properties", sr.msg.Properties)
- }
-
- if sr.callback != nil {
- sr.callbackOnce.Do(func() {
- runCallback(sr.callback, nil, sr.msg, errProducerClosed)
- })
- }
- if sr.transaction != nil {
- sr.transaction.endSendOrAckOp(nil)
- }
+ sr.done(nil, errProducerClosed)
}
// flag the sending has completed with error, flush make no effect
@@ -1448,19 +1388,29 @@ func (p *partitionProducer) Close() {
}
type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
- callbackOnce *sync.Once
- publishTime time.Time
- flushImmediately bool
- totalChunks int
- chunkID int
- uuid string
- chunkRecorder *chunkRecorder
- transaction *transaction
- reservedMem int64
+ ctx context.Context
+ msg *ProducerMessage
+ producer *partitionProducer
+ callback func(MessageID, *ProducerMessage, error)
+ callbackOnce *sync.Once
+ publishTime time.Time
+ flushImmediately bool
+ totalChunks int
+ chunkID int
+ uuid string
+ chunkRecorder *chunkRecorder
+
+ /// resource management
+
+ memLimit internal.MemoryLimitController
+ reservedMem int64
+ semaphore internal.Semaphore
+ reservedSemaphore int
+
+ /// convey settable state
+
sendAsBatch bool
+ transaction *transaction
schema Schema
schemaVersion []byte
uncompressedPayload []byte
@@ -1473,6 +1423,116 @@ type sendRequest struct {
maxMessageSize int32
}
+func (sr *sendRequest) done(msgID MessageID, err error) {
+ if err == nil {
+ sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
+ sr.producer.metrics.MessagesPublished.Inc()
+ sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
+
+ if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+ if sr.producer.options.Interceptors != nil {
+ sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
+ }
+ }
+ }
+
+ if err != nil {
+ sr.producer.log.WithError(err).
+ WithField("size", sr.reservedMem).
+ WithField("properties", sr.msg.Properties)
+ }
+
+ if errors.Is(err, errSendTimeout) {
+ sr.producer.metrics.PublishErrorsTimeout.Inc()
+ }
+
+ if errors.Is(err, errMessageTooLarge) {
+ sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
+ }
+
+ if sr.semaphore != nil {
+ sr.semaphore.Release()
+ sr.producer.metrics.MessagesPending.Dec()
+ }
+
+ if sr.memLimit != nil {
+ sr.memLimit.ReleaseMemory(sr.reservedMem)
+ sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
+ }
+
+ // sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately
+ if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 {
+ sr.callbackOnce.Do(func() {
+ runCallback(sr.callback, msgID, sr.msg, err)
+ })
+
+ if sr.transaction != nil {
+ sr.transaction.endSendOrAckOp(err)
+ }
+ }
+}
+
+func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
+ for i := 0; i < sr.totalChunks; i++ {
+ if p.options.DisableBlockIfQueueFull {
+ if !p.publishSemaphore.TryAcquire() {
+ return errSendQueueIsFull
+ }
+
+ // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
+ // of that only a part of the chunks acquire succeed
+ sr.semaphore = p.publishSemaphore
+ sr.reservedSemaphore++
+ p.metrics.MessagesPending.Inc()
+ } else {
+ if !p.publishSemaphore.Acquire(sr.ctx) {
+ return errContextExpired
+ }
+
+ // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
+ // of that only a part of the chunks acquire succeed
+ sr.semaphore = p.publishSemaphore
+ sr.reservedSemaphore++
+ p.metrics.MessagesPending.Inc()
+ }
+ }
+
+ return nil
+}
+
+func (p *partitionProducer) reserveMem(sr *sendRequest) error {
+ requiredMem := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ requiredMem = int64(sr.compressedSize)
+ }
+
+ if p.options.DisableBlockIfQueueFull {
+ if !p.client.memLimit.TryReserveMemory(requiredMem) {
+ return errMemoryBufferIsFull
+ }
+
+ } else {
+ if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
+ return errContextExpired
+ }
+ }
+
+ sr.memLimit = p.client.memLimit
+ sr.reservedMem += requiredMem
+ p.metrics.BytesPending.Add(float64(requiredMem))
+ return nil
+}
+
+func (p *partitionProducer) reserveResources(sr *sendRequest) error {
+ if err := p.reserveSemaphore(sr); err != nil {
+ return err
+ }
+ if err := p.reserveMem(sr); err != nil {
+ return err
+ }
+ return nil
+}
+
type closeProducer struct {
doneCh chan struct{}
}
@@ -1502,53 +1562,12 @@ func (p *partitionProducer) _setConn(conn internal.Connection) {
// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
- // Invariant: The conn must be non-nil for the lifetime of the partitionProducer.
+ // Invariant: p.conn must be non-nil for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
}
-func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
- p.publishSemaphore.Release()
- p.client.memLimit.ReleaseMemory(size)
-}
-
-func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
- if p.options.DisableBlockIfQueueFull {
- if !p.publishSemaphore.TryAcquire() {
- runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull)
- return false
- }
- } else {
- if !p.publishSemaphore.Acquire(sr.ctx) {
- runCallback(sr.callback, nil, sr.msg, errContextExpired)
- return false
- }
- }
- p.metrics.MessagesPending.Inc()
- return true
-}
-
-func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
- if p.options.DisableBlockIfQueueFull {
- if !p.client.memLimit.TryReserveMemory(size) {
- p.publishSemaphore.Release()
- runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull)
- return false
- }
-
- } else {
- if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
- p.publishSemaphore.Release()
- runCallback(sr.callback, nil, sr.msg, errContextExpired)
- return false
- }
- }
- sr.reservedMem += size
- p.metrics.BytesPending.Add(float64(size))
- return true
-}
-
type chunkRecorder struct {
chunkedMsgID chunkMessageID
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 29ffa780..49e225f3 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2079,7 +2079,6 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
}
func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
-
c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 5 * 1024,
@@ -2136,12 +2135,11 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
SendTimeout: 2 * time.Second,
})
- // producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk)
- // because it reaches MaxPendingMessages in chunking
+ // producer3 cannot reserve 2*1024 bytes because it reaches MaxPendingMessages in chunking
_, _ = producer3.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 2*1024),
})
- assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
+ assert.Zero(t, c.(*client).memLimit.CurrentUsage())
}
func TestMemLimitContextCancel(t *testing.T) {
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 385b197e..74e8dd0c 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) {
// Create transaction and register the send operation.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
- txn.(*transaction).registerSendOrAckOp()
// Create a producer with chunking enabled to send a large message that will be split into chunks.
producer, err := client.CreateProducer(ProducerOptions{