You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/02/09 07:04:32 UTC
[pulsar-client-go] branch master updated: [Bugfix] producer runEventsLoop for reconnect early exit (#721)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 16e8b81 [Bugfix] producer runEventsLoop for reconnect early exit (#721)
16e8b81 is described below
commit 16e8b8114615146d645f2947afeb5f3cedbc85a8
Author: billowqiu <bi...@163.com>
AuthorDate: Wed Feb 9 15:04:26 2022 +0800
[Bugfix] producer runEventsLoop for reconnect early exit (#721)
* Fix closed connection leak
* Fix closed connection leak
* bugfix: runEventsLoop for reconnect early exit
* [optimize] add log when reconnect
* [Bugfix]fix panic
* [Bugfix]remove log conn ID
* [optimize]Distinguish failed create producer log
---
pulsar/consumer_partition.go | 3 +++
pulsar/internal/connection_pool.go | 6 +++---
pulsar/producer_partition.go | 21 +++++++++++++--------
3 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bd4a94..04a39c5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() {
for {
select {
case <-pc.closeCh:
+ pc.log.Info("close consumer, exit reconnect")
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
@@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
+ pc.log.Info("consumer state not ready, exit reconnect")
return
}
@@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
pc.log.Info("Reconnected consumer to broker")
return
}
+ pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 4787ba1..db67c25 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
p.Lock()
conn, ok := p.connections[key]
if ok {
- p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
+ p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
// remove stale/failed connection
if conn.closed() {
- delete(p.connections, key)
- conn.Close()
p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
+ delete(p.connections, key)
+ conn.Close()
conn = nil // set to nil so we create a new one
}
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3f1e54b..913c33c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}
err := p.grabCnx()
if err != nil {
- logger.WithError(err).Error("Failed to create producer")
+ logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
return nil, err
}
@@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error {
}
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
- p.log.WithError(err).Error("Failed to create producer")
+ p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
return err
}
@@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() {
for maxRetry != 0 {
if p.getProducerState() != producerReady {
// Producer is already closing
+ p.log.Info("producer state not ready, exit reconnect")
return
}
@@ -337,6 +338,7 @@ func (p *partitionProducer) reconnectToBroker() {
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
return
}
+ p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
@@ -352,12 +354,15 @@ func (p *partitionProducer) reconnectToBroker() {
func (p *partitionProducer) runEventsLoop() {
go func() {
- select {
- case <-p.closeCh:
- return
- case <-p.connectClosedCh:
- p.log.Info("runEventsLoop will reconnect in producer")
- p.reconnectToBroker()
+ for {
+ select {
+ case <-p.closeCh:
+ p.log.Info("close producer, exit reconnect")
+ return
+ case <-p.connectClosedCh:
+ p.log.Info("runEventsLoop will reconnect in producer")
+ p.reconnectToBroker()
+ }
}
}()