You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/08 00:04:12 UTC

[GitHub] [pulsar-client-go] dferstay opened a new pull request #535: Fix data race while accessing connection in partitionConsumer

dferstay opened a new pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535


   The partitionConsumer maintains a few internal go-routines, two of which
   access the underlying internal.Connection.  The main runEvenstLoop()
   go-routine reads the connection field while a separate go-routine is used
   to detect connnection loss, initiate reconnection, and sets the connection.
   
   Previously, access to the conn field was not synchronized.
   
   Now, the conn field is read and written atomically; avoiding race
   conditions.
   
   Signed-off-by: Daniel Ferstay <df...@splunk.com>
   
   ### Motivation
   
   While attempting to submit a separate PR (https://github.com/apache/pulsar-client-go/pull/534) I found that the `pulsar/reader_test` consistently failed with the following data race.  This change in this PR is an attempt to fix it. 
   
   ```
   2021-06-07T22:33:43.1825587Z ==================
   2021-06-07T22:33:43.1825992Z WARNING: DATA RACE
   2021-06-07T22:33:43.1826513Z Read at 0x00c0003de4a8 by goroutine 463:
   2021-06-07T22:33:43.1828038Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).requestGetLastMessageID()
   2021-06-07T22:33:43.1829418Z       /pulsar-client-go/pulsar/consumer_partition.go:279 +0x27e
   2021-06-07T22:33:43.1830873Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalGetLastMessageID()
   2021-06-07T22:33:43.1832427Z       /pulsar-client-go/pulsar/consumer_partition.go:270 +0xea
   2021-06-07T22:33:43.1833799Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
   2021-06-07T22:33:43.1835074Z       /pulsar-client-go/pulsar/consumer_partition.go:806 +0x2cb
   2021-06-07T22:33:43.1835559Z 
   2021-06-07T22:33:43.1836251Z Previous write at 0x00c0003de4a8 by goroutine 294:
   2021-06-07T22:33:43.1837949Z time="2021-06-07T22:33:41Z" level=info msg="[Connected consumer]" consumerID=2 name= subscription=reader-kcnmq topic=my-topic-971826719
   2021-06-07T22:33:43.1839441Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabConn()
   2021-06-07T22:33:43.1841513Z       /pulsar-client-go/pulsar/consumer_partition.go:974 +0x1875
   2021-06-07T22:33:43.1842783Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).reconnectToBroker()
   2021-06-07T22:33:43.1844507Z       /pulsar-client-go/pulsar/consumer_partition.go:887 +0x2db
   2021-06-07T22:33:43.1845873Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop.func2()
   2021-06-07T22:33:43.1847183Z       /pulsar-client-go/pulsar/consumer_partition.go:791 +0xbe
   2021-06-07T22:33:43.1847678Z 
   2021-06-07T22:33:43.1848159Z Goroutine 463 (running) created at:
   2021-06-07T22:33:43.1849312Z   github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
   2021-06-07T22:33:43.1850983Z       /pulsar-client-go/pulsar/consumer_partition.go:208 +0xf46
   2021-06-07T22:33:43.1852092Z   github.com/apache/pulsar-client-go/pulsar.newReader()
   2021-06-07T22:33:43.1853173Z       /pulsar-client-go/pulsar/reader_impl.go:105 +0x8ab
   2021-06-07T22:33:43.1854294Z   github.com/apache/pulsar-client-go/pulsar.(*client).CreateReader()
   2021-06-07T22:33:43.1855391Z       /pulsar-client-go/pulsar/client_impl.go:170 +0xcb
   2021-06-07T22:33:43.1857422Z   github.com/apache/pulsar-client-go/pulsar.TestReaderLatestInclusiveHasNext()
   2021-06-07T22:33:43.1858855Z       /pulsar-client-go/pulsar/reader_test.go:587 +0x946
   2021-06-07T22:33:43.1859491Z   testing.tRunner()
   2021-06-07T22:33:43.1860118Z       /usr/local/go/src/testing/testing.go:909 +0x199
   2021-06-07T22:33:43.1860498Z 
   2021-06-07T22:33:43.1860966Z Goroutine 294 (running) created at:
   2021-06-07T22:33:43.1862630Z   github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
   2021-06-07T22:33:43.1864091Z       /pulsar-client-go/pulsar/consumer_partition.go:784 +0x174
   2021-06-07T22:33:43.1865119Z ==================
   ```
   ### Modifications
   
   Store the internal.Connection managed for the partitionConsumer in an `atomic.Value`: https://golang.org/pkg/sync/atomic/#Value
   
   ### Verifying this change
   
   This change is already covered by existing tests that use `partitionConsumer` instances, such as `pulsar/reader_test`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647587759



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       What's the behavior if _getConn() ever returns nil? Same for all the other places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655573130



##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"math"
 	"sync"
+	syncAtomic "sync/atomic"

Review comment:
       @jonyhy96 ,
   
   Good suggestion; I've cut over to using [uber-go.atomic.Value](https://github.com/uber-go/atomic/blob/master/value.go)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655970263



##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       > I wanted to make it clear that these methods should not be used by other components in the pulsar package
   Agree with this, but this is not a guarantee. Maybe we can add some comment on this function.

##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       > I wanted to make it clear that these methods should not be used by other components in the pulsar package
   
   Agree with this, but this is not a guarantee. Maybe we can add some comment on this function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655572053



##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       @jonyhy96 ,
   
   I wanted to make it clear that these methods should not be used by other components in the `pulsar` package, but I could remove this if you don't like it.  Admittedly this style of naming is not used anywhere else in the codebase. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647587759



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       What's the behavior if _getConn() ever returns nil? Is that possible? Same for all the other places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647666038



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       I've made the cast unchecked and added a comment that explains why. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655572053



##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       @jonyhy96 ,
   
   I wanted to make it clear that these methods should not be used by other components in the `pulsar` package, but I could remove this if you don't like it.  Admittedly this style of naming is not used anywhere else in the codebase. 

##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"math"
 	"sync"
+	syncAtomic "sync/atomic"

Review comment:
       @jonyhy96 ,
   
   Good suggestion; I've cut over to using [uber-go.atomic.Value](https://github.com/uber-go/atomic/blob/master/value.go)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655283656



##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"math"
 	"sync"
+	syncAtomic "sync/atomic"

Review comment:
       perhaps you don't need to set alias to atomic package, just use atomic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647628798



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       @cckellogg ,
   
   Good question; `_getConn()` should never return `nil`.
   
   An invariant in this code is that the `partitionConsumer.conn` field must be set and is never `nil`.
   The `grabConn()` method sets the `partitionConsumer.conn` field; `grabConn()` is called from the `newPartitionConsumer` factory method which will fail construction of the partitionConsumer if grabConn() returns an error.
   
   The above said, it is probably better for the cast in `_getConn()` to be unchecked and let the code `panic()` if the invariant is broken.  Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r654841411



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       @cckellogg ,
   
   I've reverted the behaviour of this PR back to the first approach, making access to the connection in the partitionConsumer atomic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r656399284



##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       Sure; comments added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r654848488



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       The latest CI failure is going to be addressed by https://github.com/apache/pulsar-client-go/pull/544 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r648521135



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       There was change that moved the broker reconnect out of the events go routine (https://github.com/apache/pulsar-client-go/pull/376/files). This is now causing the data race issue. 
   
   The question is if there is a reconnection what should happen with the pending events in the event channel? Right now they are processed using a stale/closed connection. Even with this fix it's possible events will try to get processed using a stale connection. Maybe that is ok but for me it makes the code difficult to follow and reason about. Ideally, we could come up with a cleaner way so we are never unintentionally using a stale/closed connection.
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647666038



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       I've made the cast unchecked and added a comment to explain why. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r649526755



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       @cckellogg ,
   I'm thinking that since [PR 376](https://github.com/apache/pulsar-client-go/pull/376) drains the connection incomingRequestsCh on close it should be possible remove the extra go-routine in the partitionConsumer and select from the connectionClosedCh in the partitonConsumer.runEventsLoop.  What do you think?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r649571586



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       @cckellogg ,
   I've attempted the above with the following commit:
   https://github.com/apache/pulsar-client-go/pull/535/commits/17335cd7846b71bd14592c944ea76f3fbcfdafd6
   
   If this change is accepted I'll clean up this branch and update the PR description. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647628798



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       @cckellogg ,
   
   Good question; `_getConn()` should never return `nil`.
   
   An invariant in this code is that the `partitionConsumer.conn` field must be set and is never `nil`.
   The `grabConn()` method sets the `partitionConsumer.conn` field; `grabConn()` is called from the `newPartitionConsumer` factory method which will fail construction of the partitionConsumer if grabConn() returns an error.
   
   The above said, it is probably better for the cast in `_getConn()` to be unchecked and let the code `panic()` if the invariant is broken rather than returning `nil` causing a `nil` pointer de-reference further down the line.  Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655283656



##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"math"
 	"sync"
+	syncAtomic "sync/atomic"

Review comment:
       perhaps you don't need to set alias to atomic package, just use atomic

##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       Maybe we don’t need _ as a function prefix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat merged pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r650340516



##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
 		pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
 	}
 
-	pc.conn = res.Cnx
+	pc._setConn(res.Cnx)
 	pc.log.Info("Connected consumer")
-	pc.conn.AddConsumeHandler(pc.consumerID, pc)
+	pc._getConn().AddConsumeHandler(pc.consumerID, pc)

Review comment:
       In https://github.com/apache/pulsar-client-go/pull/535/commits/5dfcc1300eee30824efa5945bf463beb3bc3474e we also short-circuit broker re-connection attempts on consumer close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer

Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655284649



##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 		})
 }
 
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+	pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {

Review comment:
       Maybe we don’t need _ as a function prefix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org