You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/04/20 13:33:12 UTC
[pulsar-client-go] branch master updated: Fix producer unable register when cnx closed (#761)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e3f625a Fix producer unable register when cnx closed (#761)
e3f625a is described below
commit e3f625ae8da938f5d147bdddd3a2cadced69c07b
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Wed Apr 20 21:33:07 2022 +0800
Fix producer unable register when cnx closed (#761)
* Fix producer unable register when cnx closed
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix code style
Signed-off-by: xiaolongran <xi...@tencent.com>
---
pulsar/internal/connection.go | 10 ++++++----
pulsar/producer_partition.go | 7 +++++--
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6055252..11c9d49 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,7 +53,8 @@ type TLSOptions struct {
}
var (
- errConnectionClosed = errors.New("connection closed")
+ errConnectionClosed = errors.New("connection closed")
+ errUnableRegisterListener = errors.New("unable register listener when con closed")
)
// ConnectionListener is a user of a connection (eg. a producer or
@@ -72,7 +73,7 @@ type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
- RegisterListener(id uint64, listener ConnectionListener)
+ RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
@@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer)
}
}
-func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) error {
// do not add if connection is closed
if c.closed() {
c.log.Warnf("Connection closed unable register listener id=%+v", id)
- return
+ return errUnableRegisterListener
}
c.listenersLock.Lock()
defer c.listenersLock.Unlock()
c.listeners[id] = listener
+ return nil
}
func (c *connection) UnregisterListener(id uint64) {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d37d60e..bc775e9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error {
p.sequenceIDGenerator = &nextSequenceID
}
p._setConn(res.Cnx)
- p._getConn().RegisterListener(p.producerID, p)
+ err = p._getConn().RegisterListener(p.producerID, p)
+ if err != nil {
+ return err
+ }
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
- }).Debug("Connected producer")
+ }).Info("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)