You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/29 04:50:34 UTC

[pulsar-client-go] branch master updated: Avoid producer deadlock on connection closing (#337)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c0cba32  Avoid producer deadlock on connection closing (#337)
c0cba32 is described below

commit c0cba320e933fad15469e7f51219c2d0f0d96dcf
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 28 21:50:24 2020 -0700

    Avoid producer deadlock on connection closing (#337)
    
    * Avoid producer deadlock on connection closing
    
    * Fixed constants init
    
    * Avoid creating timer instance each time, if channel is not full
    
    * Added debug statements
---
 pulsar/internal/connection.go | 47 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 37 insertions(+), 10 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 4be9ba2..0e13380 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -107,14 +107,14 @@ type ConsumerHandler interface {
 	ConnectionClosed()
 }
 
-type connectionState int
+type connectionState int32
 
 const (
-	connectionInit connectionState = iota
-	connectionConnecting
-	connectionTCPConnected
-	connectionReady
-	connectionClosed
+	connectionInit         = 0
+	connectionConnecting   = 1
+	connectionTCPConnected = 2
+	connectionReady        = 3
+	connectionClosed       = 4
 )
 
 func (s connectionState) String() string {
@@ -150,7 +150,7 @@ type incomingCmd struct {
 type connection struct {
 	sync.Mutex
 	cond              *sync.Cond
-	state             connectionState
+	state             int32
 	connectionTimeout time.Duration
 
 	logicalAddr  *url.URL
@@ -190,7 +190,7 @@ type connection struct {
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
 	connectionTimeout time.Duration, auth auth.Provider) *connection {
 	cnx := &connection{
-		state:                connectionInit,
+		state:                int32(connectionInit),
 		connectionTimeout:    connectionTimeout,
 		logicalAddr:          logicalAddr,
 		physicalAddr:         physicalAddr,
@@ -397,7 +397,34 @@ func (c *connection) runPingCheck() {
 }
 
 func (c *connection) WriteData(data Buffer) {
-	c.writeRequestsCh <- data
+	select {
+	case c.writeRequestsCh <- data:
+		// Channel is not full
+		return
+
+	default:
+		// Channel full, fallback to probe if connection is closed
+	}
+
+	for {
+		select {
+		case c.writeRequestsCh <- data:
+			// Successfully wrote on the channel
+			return
+
+		case <-time.After(100 * time.Millisecond):
+			// The channel is either:
+			// 1. blocked, in which case we need to wait until we have space
+			// 2. the connection is already closed, then we need to bail out
+			c.log.Debug("Couldn't write on connection channel immediately")
+			state := connectionState(atomic.LoadInt32(&c.state))
+			if state != connectionReady {
+				c.log.Debug("Connection was already closed")
+				return
+			}
+		}
+	}
+
 }
 
 func (c *connection) internalWriteData(data Buffer) {
@@ -729,7 +756,7 @@ func (c *connection) Close() {
 
 func (c *connection) changeState(state connectionState) {
 	c.Lock()
-	c.state = state
+	atomic.StoreInt32(&c.state, int32(state))
 	c.cond.Broadcast()
 	c.Unlock()
 }