You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/29 04:50:34 UTC
[pulsar-client-go] branch master updated: Avoid producer deadlock
on connection closing (#337)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c0cba32 Avoid producer deadlock on connection closing (#337)
c0cba32 is described below
commit c0cba320e933fad15469e7f51219c2d0f0d96dcf
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 28 21:50:24 2020 -0700
Avoid producer deadlock on connection closing (#337)
* Avoid producer deadlock on connection closing
* Fixed constants init
* Avoid creating timer instance each time, if channel is not full
* Added debug statements
---
pulsar/internal/connection.go | 47 ++++++++++++++++++++++++++++++++++---------
1 file changed, 37 insertions(+), 10 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 4be9ba2..0e13380 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -107,14 +107,14 @@ type ConsumerHandler interface {
ConnectionClosed()
}
-type connectionState int
+type connectionState int32
const (
- connectionInit connectionState = iota
- connectionConnecting
- connectionTCPConnected
- connectionReady
- connectionClosed
+ connectionInit = 0
+ connectionConnecting = 1
+ connectionTCPConnected = 2
+ connectionReady = 3
+ connectionClosed = 4
)
func (s connectionState) String() string {
@@ -150,7 +150,7 @@ type incomingCmd struct {
type connection struct {
sync.Mutex
cond *sync.Cond
- state connectionState
+ state int32
connectionTimeout time.Duration
logicalAddr *url.URL
@@ -190,7 +190,7 @@ type connection struct {
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
- state: connectionInit,
+ state: int32(connectionInit),
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
@@ -397,7 +397,34 @@ func (c *connection) runPingCheck() {
}
func (c *connection) WriteData(data Buffer) {
- c.writeRequestsCh <- data
+ select {
+ case c.writeRequestsCh <- data:
+ // Channel is not full
+ return
+
+ default:
+ // Channel full, fallback to probe if connection is closed
+ }
+
+ for {
+ select {
+ case c.writeRequestsCh <- data:
+ // Successfully wrote on the channel
+ return
+
+ case <-time.After(100 * time.Millisecond):
+ // The channel is either:
+ // 1. blocked, in which case we need to wait until we have space
+ // 2. the connection is already closed, then we need to bail out
+ c.log.Debug("Couldn't write on connection channel immediately")
+ state := connectionState(atomic.LoadInt32(&c.state))
+ if state != connectionReady {
+ c.log.Debug("Connection was already closed")
+ return
+ }
+ }
+ }
+
}
func (c *connection) internalWriteData(data Buffer) {
@@ -729,7 +756,7 @@ func (c *connection) Close() {
func (c *connection) changeState(state connectionState) {
c.Lock()
- c.state = state
+ atomic.StoreInt32(&c.state, int32(state))
c.cond.Broadcast()
c.Unlock()
}