You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:51 UTC

[pulsar-client-go] 14/38: Resend pending messages after reconnection

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 427fe05cdec379e4a773a6a911a90fb6b3820506
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 15:20:27 2019 -0700

    Resend pending messages after reconnection
---
 pulsar/impl/connection.go         | 2 +-
 pulsar/impl_partition_producer.go | 9 +++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index fa32889..0cba51d 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -369,7 +369,7 @@ func (c *connection) Close() {
 	c.pingTicker.Stop()
 	close(c.incomingRequests)
 	close(c.writeRequests)
-	//c.cnx = nil
+
 	for _, listener := range c.listeners {
 		listener.ConnectionClosed()
 	}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index cdd34d5..5f50296 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -114,6 +114,13 @@ func (p *partitionProducer) grabCnx() error {
 	p.cnx = res.Cnx
 	p.cnx.RegisterListener(p.producerId, p)
 	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
+
+	if p.pendingQueue.Size() > 0 {
+		p.log.Infof("Resending %v pending batches", p.pendingQueue.Size())
+		for it := p.pendingQueue.Iterator(); it.HasNext(); {
+			p.cnx.WriteData(it.Next().(*pendingItem).batchData)
+		}
+	}
 	return nil
 }
 
@@ -126,6 +133,7 @@ func (p *partitionProducer) ConnectionClosed() {
 }
 
 func (p *partitionProducer) reconnectToBroker() {
+	p.log.Info("Reconnecting to broker")
 	backoff := impl.Backoff{}
 	for {
 		err := p.grabCnx()
@@ -255,6 +263,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 
 	// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
 	p.pendingQueue.Poll()
+	p.publishSemaphore.Release()
 	for _ ,i := range pi.sendRequest {
 		sr := i.(*sendRequest)
 		sr.callback(nil, sr.msg, nil)