You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "shibd (via GitHub)" <gi...@apache.org> on 2023/02/16 16:48:48 UTC

[GitHub] [pulsar-client-go] shibd opened a new pull request, #958: [feat] Support WaitForExclusive producer access mode.

shibd opened a new pull request, #958:
URL: https://github.com/apache/pulsar-client-go/pull/958

   Master Issue: #931
   
   ### Motivation
   
   #931
   
   ### Modifications
   
   - Support the `WaitForExclusive` Producer access mode config in the `ProducerOptions`.
   
   
   ### Verifying this change
   
   - Add `TestWaitForExclusiveProducer ` to cover it.
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (GoDocs)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1111416873


##########
pulsar/producer_test.go:
##########
@@ -1737,3 +1737,46 @@ func TestExclusiveProducer(t *testing.T) {
 	assert.Error(t, err, "Producer should be failed")
 	assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
 }
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+		// Set the request timeout is 200ms
+		OperationTimeout: 200 * time.Millisecond,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()

Review Comment:
   Sorry for do incorrect review. You should remove this line. 
   
   You already close this producer on line 1780.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1109515883


##########
pulsar/internal/rpc_client.go:
##########
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 			Cnx:      cnx,
 			Response: response,
 		}, err}
-		close(ch)
 	})
 
-	select {
-	case res := <-ch:
-		return res.RPCResult, res.error
-	case <-time.After(c.requestTimeout):
-		return nil, ErrRequestTimeOut
+	timeoutCh := time.After(c.requestTimeout)
+	for {
+		select {
+		case res := <-ch:
+			// Ignoring producer not ready response.
+			// Continue to wait for the producer to create successfully
+			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+				if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+					timeoutCh = nil

Review Comment:
   Do you want to reset the `timeoutCh`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1109515883


##########
pulsar/internal/rpc_client.go:
##########
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 			Cnx:      cnx,
 			Response: response,
 		}, err}
-		close(ch)
 	})
 
-	select {
-	case res := <-ch:
-		return res.RPCResult, res.error
-	case <-time.After(c.requestTimeout):
-		return nil, ErrRequestTimeOut
+	timeoutCh := time.After(c.requestTimeout)
+	for {
+		select {
+		case res := <-ch:
+			// Ignoring producer not ready response.
+			// Continue to wait for the producer to create successfully
+			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+				if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+					timeoutCh = nil

Review Comment:
   Do you want to reset the `timeoutCh`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] shibd merged pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd merged PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1109567687


##########
pulsar/producer_test.go:
##########
@@ -1737,3 +1737,46 @@ func TestExclusiveProducer(t *testing.T) {
 	assert.Error(t, err, "Producer should be failed")
 	assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
 }
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+		// Set the request timeout is 200ms
+		OperationTimeout: 200 * time.Millisecond,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()

Review Comment:
   ```suggestion
   	defer producer1.Close()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] nodece commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1109568855


##########
pulsar/internal/rpc_client.go:
##########
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 			Cnx:      cnx,
 			Response: response,
 		}, err}
-		close(ch)
 	})
 
-	select {
-	case res := <-ch:
-		return res.RPCResult, res.error
-	case <-time.After(c.requestTimeout):
-		return nil, ErrRequestTimeOut
+	timeoutCh := time.After(c.requestTimeout)
+	for {
+		select {
+		case res := <-ch:
+			// Ignoring producer not ready response.
+			// Continue to wait for the producer to create successfully
+			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+				if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+					timeoutCh = nil

Review Comment:
   Do you need to reset `timeoutCh`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] shibd commented on a diff in pull request #958: [feat] Support WaitForExclusive producer access mode.

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #958:
URL: https://github.com/apache/pulsar-client-go/pull/958#discussion_r1111378142


##########
pulsar/internal/rpc_client.go:
##########
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 			Cnx:      cnx,
 			Response: response,
 		}, err}
-		close(ch)
 	})
 
-	select {
-	case res := <-ch:
-		return res.RPCResult, res.error
-	case <-time.After(c.requestTimeout):
-		return nil, ErrRequestTimeOut
+	timeoutCh := time.After(c.requestTimeout)
+	for {
+		select {
+		case res := <-ch:
+			// Ignoring producer not ready response.
+			// Continue to wait for the producer to create successfully
+			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+				if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+					timeoutCh = nil

Review Comment:
   No, when receiving a broker response after, the client needs to wait away permanently.



##########
pulsar/producer_test.go:
##########
@@ -1737,3 +1737,46 @@ func TestExclusiveProducer(t *testing.T) {
 	assert.Error(t, err, "Producer should be failed")
 	assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
 }
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+		// Set the request timeout is 200ms
+		OperationTimeout: 200 * time.Millisecond,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+	defer producer1.Close()

Review Comment:
   Where need change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org