You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/04/20 13:33:12 UTC

[pulsar-client-go] branch master updated: Fix producer unable register when cnx closed (#761)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f625a  Fix producer unable register when cnx closed (#761)
e3f625a is described below

commit e3f625ae8da938f5d147bdddd3a2cadced69c07b
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Wed Apr 20 21:33:07 2022 +0800

    Fix producer unable register when cnx closed (#761)
    
    * Fix producer unable register when cnx closed
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix code style
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
---
 pulsar/internal/connection.go | 10 ++++++----
 pulsar/producer_partition.go  |  7 +++++--
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6055252..11c9d49 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,7 +53,8 @@ type TLSOptions struct {
 }
 
 var (
-	errConnectionClosed = errors.New("connection closed")
+	errConnectionClosed       = errors.New("connection closed")
+	errUnableRegisterListener = errors.New("unable register listener when con closed")
 )
 
 // ConnectionListener is a user of a connection (eg. a producer or
@@ -72,7 +73,7 @@ type Connection interface {
 	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
 	SendRequestNoWait(req *pb.BaseCommand) error
 	WriteData(data Buffer)
-	RegisterListener(id uint64, listener ConnectionListener)
+	RegisterListener(id uint64, listener ConnectionListener) error
 	UnregisterListener(id uint64)
 	AddConsumeHandler(id uint64, handler ConsumerHandler)
 	DeleteConsumeHandler(id uint64)
@@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer)
 	}
 }
 
-func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) error {
 	// do not add if connection is closed
 	if c.closed() {
 		c.log.Warnf("Connection closed unable register listener id=%+v", id)
-		return
+		return errUnableRegisterListener
 	}
 
 	c.listenersLock.Lock()
 	defer c.listenersLock.Unlock()
 
 	c.listeners[id] = listener
+	return nil
 }
 
 func (c *connection) UnregisterListener(id uint64) {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d37d60e..bc775e9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error {
 		p.sequenceIDGenerator = &nextSequenceID
 	}
 	p._setConn(res.Cnx)
-	p._getConn().RegisterListener(p.producerID, p)
+	err = p._getConn().RegisterListener(p.producerID, p)
+	if err != nil {
+		return err
+	}
 	p.log.WithFields(log.Fields{
 		"cnx":   res.Cnx.ID(),
 		"epoch": atomic.LoadUint64(&p.epoch),
-	}).Debug("Connected producer")
+	}).Info("Connected producer")
 
 	pendingItems := p.pendingQueue.ReadableSlice()
 	viewSize := len(pendingItems)