You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/25 21:09:59 UTC

[GitHub] [pulsar-client-go] zzzming opened a new pull request #451: Issue #448 make state thread safe in consumer_partition and connection

zzzming opened a new pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451


   
   ### Motivation
   Address Issue: #448 and connection.go also missing a few sync against state
   
   ### Modifications
   
   Use go sync/atomic to synchronize state and add to missing ones in connection.go
   
   ### Verifying this change
   
   ### Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
   
   ### Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented?
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#discussion_r564216969



##########
File path: pulsar/producer_partition.go
##########
@@ -263,7 +267,7 @@ func (p *partitionProducer) reconnectToBroker() {
 	}
 
 	for maxRetry != 0 {
-		if atomic.LoadInt32(&p.state) != producerReady {
+		if p.getProduerState() != producerReady {

Review comment:
       ```suggestion
   		if p.getProducerState() != producerReady {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767141804


   The `uber/atomic` looks indeed more readable and safer than the `sync/atomic`. It would be good to use that consistently


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767132971


   I'm not particularly attached to uber's atomic, I used it for expediency's sake, but direct calls to atomic always make me nervous if I'm doing them from multiple places, so  guarding it in a (probably inlined) method is usually the way I like to go. YMMV. But, if it works and it's consistent with other code, who am I to judge? ¯\_ʕ◔ϖ◔ʔ_/¯ Aside from that, LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#discussion_r564216544



##########
File path: pulsar/producer_partition.go
##########
@@ -703,8 +707,22 @@ func (p *partitionProducer) Flush() error {
 	return cp.err
 }
 
+func (p *partitionProducer) getProduerState() producerState {

Review comment:
       ```suggestion
   func (p *partitionProducer) getProducerState() producerState {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767141804


   The `uber/atomic` looks indeed more readable and safer than the `sync/atomic`. It would be good to use that consistently


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767132971


   I'm not particularly attached to uber's atomic, I used it for expediency's sake, but direct calls to atomic always make me nervous if I'm doing them from multiple places, so  guarding it in a (probably inlined) method is usually the way I like to go. YMMV. But, if it works and it's consistent with other code, who am I to judge? ¯\_ʕ◔ϖ◔ʔ_/¯ Aside from that, LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] zzzming commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
zzzming commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767283762


   > The `uber/atomic` looks indeed more readable and safer than the `sync/atomic`. It would be good to use that consistently
   
   updated all states with uber/atomic


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat merged pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] zzzming commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
zzzming commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767134745


   > I'm not particularly attached to uber's atomic, I used it for expediency's sake, but direct calls to atomic always make me nervous if I'm doing them from multiple places, so guarding it in a (probably inlined) method is usually the way I like to go. YMMV. But, if it works and it's consistent with other code, who am I to judge? ¯_ʕ◔ϖ◔ʔ_/¯ Aside from that, LGTM
   
   @flowchartsman I share the same sentiment that I would tend to forget about add sync.atomic too. Let's wait for Matteo to weight in. It's a minor change either way. cheers


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#discussion_r564217129



##########
File path: pulsar/producer_partition.go
##########
@@ -703,8 +707,22 @@ func (p *partitionProducer) Flush() error {
 	return cp.err
 }
 
+func (p *partitionProducer) getProduerState() producerState {
+	return producerState(p.state.Load())
+}
+
+func (p *partitionProducer) setProducerState(state producerState) {
+	p.state.Swap(int32(state))
+}
+
+// set a new consumerState and return the last state
+// returns bool if the new state has been set or not
+func (p *partitionProducer) casProducerState(oldState, newState producerState) bool {
+	return p.state.CAS(int32(oldState), int32(newState))
+}
+
 func (p *partitionProducer) Close() {
-	if atomic.LoadInt32(&p.state) != producerReady {
+	if p.getProduerState() != producerReady {

Review comment:
       ```suggestion
   	if p.getProducerState() != producerReady {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#discussion_r564217027



##########
File path: pulsar/producer_partition.go
##########
@@ -443,7 +447,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 
 func (p *partitionProducer) failTimeoutMessages() {
 	// since Closing/Closed connection couldn't be reopen, load and compare is safe
-	state := atomic.LoadInt32(&p.state)
+	state := p.getProduerState()

Review comment:
       ```suggestion
   	state := p.getProducerState()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] zzzming commented on pull request #451: Issue #448 make state thread safe in consumer_partition and connection

Posted by GitBox <gi...@apache.org>.
zzzming commented on pull request #451:
URL: https://github.com/apache/pulsar-client-go/pull/451#issuecomment-767134745


   > I'm not particularly attached to uber's atomic, I used it for expediency's sake, but direct calls to atomic always make me nervous if I'm doing them from multiple places, so guarding it in a (probably inlined) method is usually the way I like to go. YMMV. But, if it works and it's consistent with other code, who am I to judge? ¯_ʕ◔ϖ◔ʔ_/¯ Aside from that, LGTM
   
   @flowchartsman I share the same sentiment that I would tend to forget about add sync.atomic too. Let's wait for Matteo to weight in. It's a minor change either way. cheers


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org