You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 19:43:56 UTC

[pulsar-client-go] branch master updated: Fix using closed connection in consumer (#785)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2422b  Fix using closed connection in consumer (#785)
1e2422b is described below

commit 1e2422bc5cfec92e7fec5df5e48b8e766a2d6288
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Fri Jun 24 04:43:52 2022 +0900

    Fix using closed connection in consumer (#785)
---
 pulsar/consumer_partition.go  |  6 +++++-
 pulsar/internal/connection.go | 12 +++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1ddcc39..4cc1645 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1187,7 +1187,11 @@ func (pc *partitionConsumer) grabConn() error {
 
 	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+	err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+	if err != nil {
+		pc.log.WithError(err).Error("Failed to add consumer handler")
+		return err
+	}
 
 	msgType := res.Response.GetType()
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index fa8d055..0249020 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,8 +53,9 @@ type TLSOptions struct {
 }
 
 var (
-	errConnectionClosed       = errors.New("connection closed")
-	errUnableRegisterListener = errors.New("unable register listener when con closed")
+	errConnectionClosed        = errors.New("connection closed")
+	errUnableRegisterListener  = errors.New("unable register listener when con closed")
+	errUnableAddConsumeHandler = errors.New("unable add consumer handler when con closed")
 )
 
 // ConnectionListener is a user of a connection (eg. a producer or
@@ -75,7 +76,7 @@ type Connection interface {
 	WriteData(data Buffer)
 	RegisterListener(id uint64, listener ConnectionListener) error
 	UnregisterListener(id uint64)
-	AddConsumeHandler(id uint64, handler ConsumerHandler)
+	AddConsumeHandler(id uint64, handler ConsumerHandler) error
 	DeleteConsumeHandler(id uint64)
 	ID() string
 	GetMaxMessageSize() int32
@@ -994,16 +995,17 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
 	return tlsConfig, nil
 }
 
-func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
+func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) error {
 	// do not add if connection is closed
 	if c.closed() {
 		c.log.Warnf("Closed connection unable add consumer with id=%+v", id)
-		return
+		return errUnableAddConsumeHandler
 	}
 
 	c.consumerHandlersLock.Lock()
 	defer c.consumerHandlersLock.Unlock()
 	c.consumerHandlers[id] = handler
+	return nil
 }
 
 func (c *connection) DeleteConsumeHandler(id uint64) {