You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/22 08:54:40 UTC
[pulsar-client-go] branch master updated: fix fail to add
batchbuilder (#260)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 416c5f6 fix fail to add batchbuilder (#260)
416c5f6 is described below
commit 416c5f6586cfa1107f7389c3d887fe1227358faa
Author: yarthur1 <30...@users.noreply.github.com>
AuthorDate: Fri May 22 16:54:33 2020 +0800
fix fail to add batchbuilder (#260)
Co-authored-by: yexijun <ye...@cmcm.com>
---
pulsar/producer_partition.go | 38 ++++++++++++++++----------------------
1 file changed, 16 insertions(+), 22 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 30ab0d9..63f7bf4 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"sync"
"sync/atomic"
"time"
@@ -39,6 +40,8 @@ const (
producerClosed
)
+var errFailAddBatch = errors.New("message send failed")
+
type partitionProducer struct {
state int32
client *client
@@ -265,35 +268,26 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}
- if sendAsBatch {
- added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
- msg.ReplicationClusters, deliverAt)
- if !added {
- // The current batch is full.. flush it and retry
- p.internalFlushCurrentBatch()
+ added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+ msg.ReplicationClusters, deliverAt)
+ if !added {
+ // The current batch is full.. flush it and retry
+ p.internalFlushCurrentBatch()
- // after flushing try again to add the current payload
- if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
- msg.ReplicationClusters, deliverAt); !ok {
- p.log.WithField("size", len(msg.Payload)).
- WithField("sequenceID", sequenceID).
- WithField("properties", msg.Properties).
- Error("unable to add message to batch")
- }
- }
- } else {
- // Send individually
- if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
- msg.ReplicationClusters, deliverAt); !added {
+ // after flushing try again to add the current payload
+ if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+ msg.ReplicationClusters, deliverAt); !ok {
+ p.publishSemaphore.Release()
+ request.callback(nil, request.msg, errFailAddBatch)
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
- Error("unable to send single message")
+ Error("unable to add message to batch")
+ return
}
- p.internalFlushCurrentBatch()
}
- if request.flushImmediately {
+ if !sendAsBatch || request.flushImmediately {
p.internalFlushCurrentBatch()
}
}