You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 19:43:56 UTC
[pulsar-client-go] branch master updated: Fix using closed connection in consumer (#785)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1e2422b Fix using closed connection in consumer (#785)
1e2422b is described below
commit 1e2422bc5cfec92e7fec5df5e48b8e766a2d6288
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Fri Jun 24 04:43:52 2022 +0900
Fix using closed connection in consumer (#785)
---
pulsar/consumer_partition.go | 6 +++++-
pulsar/internal/connection.go | 12 +++++++-----
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1ddcc39..4cc1645 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1187,7 +1187,11 @@ func (pc *partitionConsumer) grabConn() error {
pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+ err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to add consumer handler")
+ return err
+ }
msgType := res.Response.GetType()
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index fa8d055..0249020 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,8 +53,9 @@ type TLSOptions struct {
}
var (
- errConnectionClosed = errors.New("connection closed")
- errUnableRegisterListener = errors.New("unable register listener when con closed")
+ errConnectionClosed = errors.New("connection closed")
+ errUnableRegisterListener = errors.New("unable register listener when con closed")
+ errUnableAddConsumeHandler = errors.New("unable add consumer handler when con closed")
)
// ConnectionListener is a user of a connection (eg. a producer or
@@ -75,7 +76,7 @@ type Connection interface {
WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
- AddConsumeHandler(id uint64, handler ConsumerHandler)
+ AddConsumeHandler(id uint64, handler ConsumerHandler) error
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
@@ -994,16 +995,17 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
return tlsConfig, nil
}
-func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
+func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) error {
// do not add if connection is closed
if c.closed() {
c.log.Warnf("Closed connection unable add consumer with id=%+v", id)
- return
+ return errUnableAddConsumeHandler
}
c.consumerHandlersLock.Lock()
defer c.consumerHandlersLock.Unlock()
c.consumerHandlers[id] = handler
+ return nil
}
func (c *connection) DeleteConsumeHandler(id uint64) {