You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/09 03:39:29 UTC

[GitHub] [pulsar-client-go] yuanweikang2020 commented on issue #462: goroutine leak

yuanweikang2020 commented on issue #462:
URL: https://github.com/apache/pulsar-client-go/issues/462#issuecomment-775637212


   > thanks @yuanweikang2020 for this issue, can you provide more context how to reproduce or verify this issue?
   
   when pulsar is failure, pulsar client reconnect
   ```
   func (p *partitionProducer) runEventsLoop() {
   	for {
   		select {
   		case i := <-p.eventsChan:
   			switch v := i.(type) {
   			case *sendRequest:
   				p.internalSend(v)
   			case *connectionClosed:
   				p.reconnectToBroker()
   			case *flushRequest:
   				p.internalFlush(v)
   			case *closeProducer:
   				p.internalClose(v)
   				return
   			}
   
   		case <-p.batchFlushTicker.C:
   			p.internalFlushCurrentBatch()
   		}
   	}
   }
   ```
   when reconnect pulsar Broker
   ```
   func (p *partitionProducer) reconnectToBroker() {
   	backoff := internal.Backoff{}
   	for {
   		if p.state != producerReady {
   			// Producer is already closing
   			return
   		}
   
   		d := backoff.Next()
   		p.log.Info("Reconnecting to broker in ", d)
   		time.Sleep(d)
   
   		err := p.grabCnx()
   		if err == nil {
   			// Successfully reconnected
   			p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
   			return
   		}
   	}
   }
   ```
   get connection frequently
   ```
   func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
   	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
   	backoff := Backoff{1 * time.Second}
   	startTime := time.Now()
   	var retryTime time.Duration
   	if err != nil {
   		for time.Since(startTime) < c.requestTimeout {
   			retryTime = backoff.Next()
   			c.log.Debugf("Reconnecting to broker in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
   			time.Sleep(retryTime)
   			cnx, err = c.pool.GetConnection(logicalAddr, physicalAddr)
   			if err == nil {
   				c.log.Debugf("retry connection success")
   				return cnx, nil
   			}
   		}
   		return nil, err
   	}
   	return cnx, nil
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org