You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 18:36:42 UTC

[pulsar-client-go] branch master updated: Fixed producer.Flush() (#5)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a6187  Fixed producer.Flush() (#5)
d6a6187 is described below

commit d6a61879ed4943fc5261516b433e8b003e786683
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 15 11:36:37 2019 -0700

    Fixed producer.Flush() (#5)
---
 pulsar/impl_partition_producer.go | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 19947c1..215916d 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -281,6 +281,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 
 	pi := p.pendingQueue.PeekLast().(*pendingItem)
 	pi.sendRequests = append(pi.sendRequests, &sendRequest{
+		msg: nil,
 		callback: func(id MessageID, message *ProducerMessage, e error) {
 			fr.err = e
 			fr.waitGroup.Done()
@@ -337,9 +338,12 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	p.pendingQueue.Poll()
 	for idx, i := range pi.sendRequests {
 		sr := i.(*sendRequest)
-		atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
-		if sr.callback != nil {
+		if sr.msg != nil {
+			atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
 			p.publishSemaphore.Release()
+		}
+
+		if sr.callback != nil {
 			msgID := newMessageId(
 				int64(response.MessageId.GetLedgerId()),
 				int64(response.MessageId.GetEntryId()),