You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cc...@apache.org on 2021/11/11 18:08:05 UTC
[pulsar-client-go] branch master updated: [Issue 662] Fix race in
connection.go waitUntilReady() (#663)
This is an automated email from the ASF dual-hosted git repository.
cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 774769d [Issue 662] Fix race in connection.go waitUntilReady() (#663)
774769d is described below
commit 774769dccb9ae59b76142e387d8866d1611482e8
Author: Ben Schofield <bs...@users.noreply.github.com>
AuthorDate: Thu Nov 11 18:07:57 2021 +0000
[Issue 662] Fix race in connection.go waitUntilReady() (#663)
* Fix issue 662: race in connection.go waitUntilReady().
* c.Lock() is the same as c.cond.Lock(), because sync.NewCond takes a Locker as an argument. Add comments to make it clear why this lock is required, to hopefully avoid accidental removal in future.
* Clarify comment.
Co-authored-by: ben <be...@cyber.casa>
---
pulsar/internal/connection.go | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 57067ca..ecda4fa 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -327,6 +327,9 @@ func (c *connection) doHandshake() bool {
}
func (c *connection) waitUntilReady() error {
+ // If we are going to call cond.Wait() at all, then we must call it _before_ we call cond.Broadcast().
+ // The lock is held here to prevent changeState() from calling cond.Broadcast() in the time between
+ // the state check and call to cond.Wait().
c.Lock()
defer c.Unlock()
@@ -894,6 +897,11 @@ func (c *connection) Close() {
}
func (c *connection) changeState(state connectionState) {
+ // The lock is held here because we need setState() and cond.Broadcast() to be
+ // an atomic operation from the point of view of waitUntilReady().
+ c.Lock()
+ defer c.Unlock()
+
c.setState(state)
c.cond.Broadcast()
}