You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/08 00:04:12 UTC
[GitHub] [pulsar-client-go] dferstay opened a new pull request #535: Fix data race while accessing connection in partitionConsumer
dferstay opened a new pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535
The partitionConsumer maintains a few internal go-routines, two of which
access the underlying internal.Connection. The main runEvenstLoop()
go-routine reads the connection field while a separate go-routine is used
to detect connnection loss, initiate reconnection, and sets the connection.
Previously, access to the conn field was not synchronized.
Now, the conn field is read and written atomically; avoiding race
conditions.
Signed-off-by: Daniel Ferstay <df...@splunk.com>
### Motivation
While attempting to submit a separate PR (https://github.com/apache/pulsar-client-go/pull/534) I found that the `pulsar/reader_test` consistently failed with the following data race. This change in this PR is an attempt to fix it.
```
2021-06-07T22:33:43.1825587Z ==================
2021-06-07T22:33:43.1825992Z WARNING: DATA RACE
2021-06-07T22:33:43.1826513Z Read at 0x00c0003de4a8 by goroutine 463:
2021-06-07T22:33:43.1828038Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).requestGetLastMessageID()
2021-06-07T22:33:43.1829418Z /pulsar-client-go/pulsar/consumer_partition.go:279 +0x27e
2021-06-07T22:33:43.1830873Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalGetLastMessageID()
2021-06-07T22:33:43.1832427Z /pulsar-client-go/pulsar/consumer_partition.go:270 +0xea
2021-06-07T22:33:43.1833799Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
2021-06-07T22:33:43.1835074Z /pulsar-client-go/pulsar/consumer_partition.go:806 +0x2cb
2021-06-07T22:33:43.1835559Z
2021-06-07T22:33:43.1836251Z Previous write at 0x00c0003de4a8 by goroutine 294:
2021-06-07T22:33:43.1837949Z time="2021-06-07T22:33:41Z" level=info msg="[Connected consumer]" consumerID=2 name= subscription=reader-kcnmq topic=my-topic-971826719
2021-06-07T22:33:43.1839441Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabConn()
2021-06-07T22:33:43.1841513Z /pulsar-client-go/pulsar/consumer_partition.go:974 +0x1875
2021-06-07T22:33:43.1842783Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).reconnectToBroker()
2021-06-07T22:33:43.1844507Z /pulsar-client-go/pulsar/consumer_partition.go:887 +0x2db
2021-06-07T22:33:43.1845873Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop.func2()
2021-06-07T22:33:43.1847183Z /pulsar-client-go/pulsar/consumer_partition.go:791 +0xbe
2021-06-07T22:33:43.1847678Z
2021-06-07T22:33:43.1848159Z Goroutine 463 (running) created at:
2021-06-07T22:33:43.1849312Z github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
2021-06-07T22:33:43.1850983Z /pulsar-client-go/pulsar/consumer_partition.go:208 +0xf46
2021-06-07T22:33:43.1852092Z github.com/apache/pulsar-client-go/pulsar.newReader()
2021-06-07T22:33:43.1853173Z /pulsar-client-go/pulsar/reader_impl.go:105 +0x8ab
2021-06-07T22:33:43.1854294Z github.com/apache/pulsar-client-go/pulsar.(*client).CreateReader()
2021-06-07T22:33:43.1855391Z /pulsar-client-go/pulsar/client_impl.go:170 +0xcb
2021-06-07T22:33:43.1857422Z github.com/apache/pulsar-client-go/pulsar.TestReaderLatestInclusiveHasNext()
2021-06-07T22:33:43.1858855Z /pulsar-client-go/pulsar/reader_test.go:587 +0x946
2021-06-07T22:33:43.1859491Z testing.tRunner()
2021-06-07T22:33:43.1860118Z /usr/local/go/src/testing/testing.go:909 +0x199
2021-06-07T22:33:43.1860498Z
2021-06-07T22:33:43.1860966Z Goroutine 294 (running) created at:
2021-06-07T22:33:43.1862630Z github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
2021-06-07T22:33:43.1864091Z /pulsar-client-go/pulsar/consumer_partition.go:784 +0x174
2021-06-07T22:33:43.1865119Z ==================
```
### Modifications
Store the internal.Connection managed for the partitionConsumer in an `atomic.Value`: https://golang.org/pkg/sync/atomic/#Value
### Verifying this change
This change is already covered by existing tests that use `partitionConsumer` instances, such as `pulsar/reader_test`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647587759
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
What's the behavior if _getConn() ever returns nil? Same for all the other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655573130
##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
+ syncAtomic "sync/atomic"
Review comment:
@jonyhy96 ,
Good suggestion; I've cut over to using [uber-go.atomic.Value](https://github.com/uber-go/atomic/blob/master/value.go)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655970263
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
> I wanted to make it clear that these methods should not be used by other components in the pulsar package
Agree with this, but this is not a guarantee. Maybe we can add some comment on this function.
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
> I wanted to make it clear that these methods should not be used by other components in the pulsar package
Agree with this, but this is not a guarantee. Maybe we can add some comment on this function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655572053
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
@jonyhy96 ,
I wanted to make it clear that these methods should not be used by other components in the `pulsar` package, but I could remove this if you don't like it. Admittedly this style of naming is not used anywhere else in the codebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647587759
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
What's the behavior if _getConn() ever returns nil? Is that possible? Same for all the other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647666038
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
I've made the cast unchecked and added a comment that explains why.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655572053
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
@jonyhy96 ,
I wanted to make it clear that these methods should not be used by other components in the `pulsar` package, but I could remove this if you don't like it. Admittedly this style of naming is not used anywhere else in the codebase.
##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
+ syncAtomic "sync/atomic"
Review comment:
@jonyhy96 ,
Good suggestion; I've cut over to using [uber-go.atomic.Value](https://github.com/uber-go/atomic/blob/master/value.go)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655283656
##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
+ syncAtomic "sync/atomic"
Review comment:
perhaps you don't need to set alias to atomic package, just use atomic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647628798
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
@cckellogg ,
Good question; `_getConn()` should never return `nil`.
An invariant in this code is that the `partitionConsumer.conn` field must be set and is never `nil`.
The `grabConn()` method sets the `partitionConsumer.conn` field; `grabConn()` is called from the `newPartitionConsumer` factory method which will fail construction of the partitionConsumer if grabConn() returns an error.
The above said, it is probably better for the cast in `_getConn()` to be unchecked and let the code `panic()` if the invariant is broken. Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r654841411
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
@cckellogg ,
I've reverted the behaviour of this PR back to the first approach, making access to the connection in the partitionConsumer atomic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r656399284
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
Sure; comments added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r654848488
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
The latest CI failure is going to be addressed by https://github.com/apache/pulsar-client-go/pull/544
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r648521135
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
There was change that moved the broker reconnect out of the events go routine (https://github.com/apache/pulsar-client-go/pull/376/files). This is now causing the data race issue.
The question is if there is a reconnection what should happen with the pending events in the event channel? Right now they are processed using a stale/closed connection. Even with this fix it's possible events will try to get processed using a stale connection. Maybe that is ok but for me it makes the code difficult to follow and reason about. Ideally, we could come up with a cleaner way so we are never unintentionally using a stale/closed connection.
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647666038
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
I've made the cast unchecked and added a comment to explain why.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r649526755
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
@cckellogg ,
I'm thinking that since [PR 376](https://github.com/apache/pulsar-client-go/pull/376) drains the connection incomingRequestsCh on close it should be possible remove the extra go-routine in the partitionConsumer and select from the connectionClosedCh in the partitonConsumer.runEventsLoop. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r649571586
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
@cckellogg ,
I've attempted the above with the following commit:
https://github.com/apache/pulsar-client-go/pull/535/commits/17335cd7846b71bd14592c944ea76f3fbcfdafd6
If this change is accepted I'll clean up this branch and update the PR description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r647628798
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
@cckellogg ,
Good question; `_getConn()` should never return `nil`.
An invariant in this code is that the `partitionConsumer.conn` field must be set and is never `nil`.
The `grabConn()` method sets the `partitionConsumer.conn` field; `grabConn()` is called from the `newPartitionConsumer` factory method which will fail construction of the partitionConsumer if grabConn() returns an error.
The above said, it is probably better for the cast in `_getConn()` to be unchecked and let the code `panic()` if the invariant is broken rather than returning `nil` causing a `nil` pointer de-reference further down the line. Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655283656
##########
File path: pulsar/consumer_partition.go
##########
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
+ syncAtomic "sync/atomic"
Review comment:
perhaps you don't need to set alias to atomic package, just use atomic
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
Maybe we don’t need _ as a function prefix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] merlimat merged pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] dferstay commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
dferstay commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r650340516
##########
File path: pulsar/consumer_partition.go
##########
@@ -971,9 +972,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
Review comment:
In https://github.com/apache/pulsar-client-go/pull/535/commits/5dfcc1300eee30824efa5945bf463beb3bc3474e we also short-circuit broker re-connection attempts on consumer close.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] jonyhy96 commented on a change in pull request #535: Fix data race while accessing connection in partitionConsumer
Posted by GitBox <gi...@apache.org>.
jonyhy96 commented on a change in pull request #535:
URL: https://github.com/apache/pulsar-client-go/pull/535#discussion_r655284649
##########
File path: pulsar/consumer_partition.go
##########
@@ -1113,6 +1114,17 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+func (pc *partitionConsumer) _getConn() internal.Connection {
Review comment:
Maybe we don’t need _ as a function prefix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org