You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/22 08:54:40 UTC

[pulsar-client-go] branch master updated: fix fail to add batchbuilder (#260)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 416c5f6  fix fail to add batchbuilder (#260)
416c5f6 is described below

commit 416c5f6586cfa1107f7389c3d887fe1227358faa
Author: yarthur1 <30...@users.noreply.github.com>
AuthorDate: Fri May 22 16:54:33 2020 +0800

    fix fail to add batchbuilder (#260)
    
    Co-authored-by: yexijun <ye...@cmcm.com>
---
 pulsar/producer_partition.go | 38 ++++++++++++++++----------------------
 1 file changed, 16 insertions(+), 22 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 30ab0d9..63f7bf4 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"context"
+	"errors"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -39,6 +40,8 @@ const (
 	producerClosed
 )
 
+var errFailAddBatch = errors.New("message send failed")
+
 type partitionProducer struct {
 	state  int32
 	client *client
@@ -265,35 +268,26 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
 	}
 
-	if sendAsBatch {
-		added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
-			msg.ReplicationClusters, deliverAt)
-		if !added {
-			// The current batch is full.. flush it and retry
-			p.internalFlushCurrentBatch()
+	added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+		msg.ReplicationClusters, deliverAt)
+	if !added {
+		// The current batch is full.. flush it and retry
+		p.internalFlushCurrentBatch()
 
-			// after flushing try again to add the current payload
-			if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
-				msg.ReplicationClusters, deliverAt); !ok {
-				p.log.WithField("size", len(msg.Payload)).
-					WithField("sequenceID", sequenceID).
-					WithField("properties", msg.Properties).
-					Error("unable to add message to batch")
-			}
-		}
-	} else {
-		// Send individually
-		if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
-			msg.ReplicationClusters, deliverAt); !added {
+		// after flushing try again to add the current payload
+		if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
+			msg.ReplicationClusters, deliverAt); !ok {
+			p.publishSemaphore.Release()
+			request.callback(nil, request.msg, errFailAddBatch)
 			p.log.WithField("size", len(msg.Payload)).
 				WithField("sequenceID", sequenceID).
 				WithField("properties", msg.Properties).
-				Error("unable to send single message")
+				Error("unable to add message to batch")
+			return
 		}
-		p.internalFlushCurrentBatch()
 	}
 
-	if request.flushImmediately {
+	if !sendAsBatch || request.flushImmediately {
 		p.internalFlushCurrentBatch()
 	}
 }