You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:51 UTC
[pulsar-client-go] 14/38: Resend pending messages after reconnection
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 427fe05cdec379e4a773a6a911a90fb6b3820506
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 15:20:27 2019 -0700
Resend pending messages after reconnection
---
pulsar/impl/connection.go | 2 +-
pulsar/impl_partition_producer.go | 9 +++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index fa32889..0cba51d 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -369,7 +369,7 @@ func (c *connection) Close() {
c.pingTicker.Stop()
close(c.incomingRequests)
close(c.writeRequests)
- //c.cnx = nil
+
for _, listener := range c.listeners {
listener.ConnectionClosed()
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index cdd34d5..5f50296 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -114,6 +114,13 @@ func (p *partitionProducer) grabCnx() error {
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerId, p)
p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
+
+ if p.pendingQueue.Size() > 0 {
+ p.log.Infof("Resending %v pending batches", p.pendingQueue.Size())
+ for it := p.pendingQueue.Iterator(); it.HasNext(); {
+ p.cnx.WriteData(it.Next().(*pendingItem).batchData)
+ }
+ }
return nil
}
@@ -126,6 +133,7 @@ func (p *partitionProducer) ConnectionClosed() {
}
func (p *partitionProducer) reconnectToBroker() {
+ p.log.Info("Reconnecting to broker")
backoff := impl.Backoff{}
for {
err := p.grabCnx()
@@ -255,6 +263,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
+ p.publishSemaphore.Release()
for _ ,i := range pi.sendRequest {
sr := i.(*sendRequest)
sr.callback(nil, sr.msg, nil)