You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/20 21:38:37 UTC

[pulsar-client-go] branch master updated: Always check connection close channell, before attempting to put requests (#521)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new af9d58e  Always check connection close channell, before attempting to put requests (#521)
af9d58e is described below

commit af9d58ea4d0ecc3596f8e4d8abab4242c7da7de4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 20 14:36:15 2021 -0700

    Always check connection close channell, before attempting to put requests (#521)
---
 pulsar/internal/connection.go | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1c264c6..9cc1238 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -549,10 +549,15 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
 	if c.getState() == connectionClosed {
 		callback(req, ErrConnectionClosed)
 	} else {
-		c.incomingRequestsCh <- &request{
+		select {
+		case <-c.closeCh:
+			callback(req, ErrConnectionClosed)
+
+		case c.incomingRequestsCh <- &request{
 			id:       &requestID,
 			cmd:      req,
 			callback: callback,
+		}:
 		}
 	}
 }
@@ -562,12 +567,17 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
 		return ErrConnectionClosed
 	}
 
-	c.incomingRequestsCh <- &request{
+	select {
+	case <-c.closeCh:
+		return ErrConnectionClosed
+
+	case c.incomingRequestsCh <- &request{
 		id:       nil,
 		cmd:      req,
 		callback: nil,
+	}:
+		return nil
 	}
-	return nil
 }
 
 func (c *connection) internalSendRequest(req *request) {