You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cc...@apache.org on 2021/11/11 18:08:05 UTC

[pulsar-client-go] branch master updated: [Issue 662] Fix race in connection.go waitUntilReady() (#663)

This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 774769d  [Issue 662] Fix race in connection.go waitUntilReady() (#663)
774769d is described below

commit 774769dccb9ae59b76142e387d8866d1611482e8
Author: Ben Schofield <bs...@users.noreply.github.com>
AuthorDate: Thu Nov 11 18:07:57 2021 +0000

    [Issue 662] Fix race in connection.go waitUntilReady() (#663)
    
    * Fix issue 662: race in connection.go waitUntilReady().
    
    * c.Lock() is the same as c.cond.Lock(), because sync.NewCond takes a Locker as an argument. Add comments to make it clear why this lock is required, to hopefully avoid accidental removal in future.
    
    * Clarify comment.
    
    Co-authored-by: ben <be...@cyber.casa>
---
 pulsar/internal/connection.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 57067ca..ecda4fa 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -327,6 +327,9 @@ func (c *connection) doHandshake() bool {
 }
 
 func (c *connection) waitUntilReady() error {
+	// If we are going to call cond.Wait() at all, then we must call it _before_ we call cond.Broadcast().
+	// The lock is held here to prevent changeState() from calling cond.Broadcast() in the time between
+	// the state check and call to cond.Wait().
 	c.Lock()
 	defer c.Unlock()
 
@@ -894,6 +897,11 @@ func (c *connection) Close() {
 }
 
 func (c *connection) changeState(state connectionState) {
+	// The lock is held here because we need setState() and cond.Broadcast() to be
+	// an atomic operation from the point of view of waitUntilReady().
+	c.Lock()
+	defer c.Unlock()
+
 	c.setState(state)
 	c.cond.Broadcast()
 }