You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/02/09 07:04:32 UTC

[pulsar-client-go] branch master updated: [Bugfix] producer runEventsLoop for reconnect early exit (#721)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e8b81  [Bugfix] producer runEventsLoop for reconnect early exit (#721)
16e8b81 is described below

commit 16e8b8114615146d645f2947afeb5f3cedbc85a8
Author: billowqiu <bi...@163.com>
AuthorDate: Wed Feb 9 15:04:26 2022 +0800

    [Bugfix] producer runEventsLoop for reconnect early exit (#721)
    
    * Fix closed connection leak
    
    * Fix closed connection leak
    
    * bugfix: runEventsLoop for reconnect early exit
    
    * [optimize] add log when reconnect
    
    * [Bugfix]fix panic
    
    * [Bugfix]remove log conn ID
    
    * [optimize]Distinguish failed create producer log
---
 pulsar/consumer_partition.go       |  3 +++
 pulsar/internal/connection_pool.go |  6 +++---
 pulsar/producer_partition.go       | 21 +++++++++++++--------
 3 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bd4a94..04a39c5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() {
 		for {
 			select {
 			case <-pc.closeCh:
+				pc.log.Info("close consumer, exit reconnect")
 				return
 			case <-pc.connectClosedCh:
 				pc.log.Debug("runEventsLoop will reconnect")
@@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
 	for maxRetry != 0 {
 		if pc.getConsumerState() != consumerReady {
 			// Consumer is already closing
+			pc.log.Info("consumer state not ready, exit reconnect")
 			return
 		}
 
@@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
 			pc.log.Info("Reconnected consumer to broker")
 			return
 		}
+		pc.log.WithError(err).Error("Failed to create consumer at reconnect")
 		errMsg := err.Error()
 		if strings.Contains(errMsg, errTopicNotFount) {
 			// when topic is deleted, we should give up reconnection.
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 4787ba1..db67c25 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
 	p.Lock()
 	conn, ok := p.connections[key]
 	if ok {
-		p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
+		p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
 			key, conn.logicalAddr, conn.physicalAddr)
 
 		// remove stale/failed connection
 		if conn.closed() {
-			delete(p.connections, key)
-			conn.Close()
 			p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
 				key, conn.logicalAddr, conn.physicalAddr)
+			delete(p.connections, key)
+			conn.Close()
 			conn = nil // set to nil so we create a new one
 		}
 	}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3f1e54b..913c33c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	}
 	err := p.grabCnx()
 	if err != nil {
-		logger.WithError(err).Error("Failed to create producer")
+		logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
 		return nil, err
 	}
 
@@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error {
 	}
 	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
 	if err != nil {
-		p.log.WithError(err).Error("Failed to create producer")
+		p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
 		return err
 	}
 
@@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() {
 	for maxRetry != 0 {
 		if p.getProducerState() != producerReady {
 			// Producer is already closing
+			p.log.Info("producer state not ready, exit reconnect")
 			return
 		}
 
@@ -337,6 +338,7 @@ func (p *partitionProducer) reconnectToBroker() {
 			p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
 			return
 		}
+		p.log.WithError(err).Error("Failed to create producer at reconnect")
 		errMsg := err.Error()
 		if strings.Contains(errMsg, errTopicNotFount) {
 			// when topic is deleted, we should give up reconnection.
@@ -352,12 +354,15 @@ func (p *partitionProducer) reconnectToBroker() {
 
 func (p *partitionProducer) runEventsLoop() {
 	go func() {
-		select {
-		case <-p.closeCh:
-			return
-		case <-p.connectClosedCh:
-			p.log.Info("runEventsLoop will reconnect in producer")
-			p.reconnectToBroker()
+		for {
+			select {
+			case <-p.closeCh:
+				p.log.Info("close producer, exit reconnect")
+				return
+			case <-p.connectClosedCh:
+				p.log.Info("runEventsLoop will reconnect in producer")
+				p.reconnectToBroker()
+			}
 		}
 	}()