You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/02/20 14:32:46 UTC
[pulsar-client-go] branch master updated: [feat] Support WaitForExclusive producer access mode. (#958)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cf3207f [feat] Support WaitForExclusive producer access mode. (#958)
cf3207f is described below
commit cf3207f4637d80efbcbc0f7646ea326d9f9bac6b
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon Feb 20 22:32:39 2023 +0800
[feat] Support WaitForExclusive producer access mode. (#958)
* [feat] Support WaitForExclusive producer access mode.
* Remove useless defer.
---
pulsar/internal/connection.go | 17 +++++++++++++++--
pulsar/internal/rpc_client.go | 22 ++++++++++++++++------
pulsar/producer.go | 3 +++
pulsar/producer_partition.go | 2 ++
pulsar/producer_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index cb4af33..67b6f32 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -523,8 +523,14 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handleResponse(cmd.Success.GetRequestId(), cmd)
case pb.BaseCommand_PRODUCER_SUCCESS:
- c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
-
+ if !*cmd.ProducerSuccess.ProducerReady {
+ request, ok := c.findPendingRequest(cmd.ProducerSuccess.GetRequestId())
+ if ok {
+ request.callback(cmd, nil)
+ }
+ } else {
+ c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
+ }
case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
c.checkServerError(cmd.PartitionMetadataResponse.Error)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)
@@ -748,6 +754,13 @@ func (c *connection) deletePendingRequest(requestID uint64) (*request, bool) {
return request, ok
}
+func (c *connection) findPendingRequest(requestID uint64) (*request, bool) {
+ c.pendingLock.Lock()
+ defer c.pendingLock.Unlock()
+ request, ok := c.pendingReqs[requestID]
+ return request, ok
+}
+
func (c *connection) failPendingRequests(err error) bool {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 378ab4f..0ee8ca9 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
Cnx: cnx,
Response: response,
}, err}
- close(ch)
})
- select {
- case res := <-ch:
- return res.RPCResult, res.error
- case <-time.After(c.requestTimeout):
- return nil, ErrRequestTimeOut
+ timeoutCh := time.After(c.requestTimeout)
+ for {
+ select {
+ case res := <-ch:
+ // Ignoring producer not ready response.
+ // Continue to wait for the producer to create successfully
+ if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+ if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+ timeoutCh = nil
+ break
+ }
+ }
+ return res.RPCResult, res.error
+ case <-timeoutCh:
+ return nil, ErrRequestTimeOut
+ }
}
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9ac34b..8fcb891 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -64,6 +64,9 @@ const (
// ProducerAccessModeExclusive is required exclusive access for producer.
// Fail immediately if there's already a producer connected.
ProducerAccessModeExclusive
+
+ // ProducerAccessModeWaitForExclusive is pending until producer can acquire exclusive access.
+ ProducerAccessModeWaitForExclusive
)
// TopicMetadata represents a topic metadata.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index eece055..160693c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1367,6 +1367,8 @@ func toProtoProducerAccessMode(accessMode ProducerAccessMode) pb.ProducerAccessM
return pb.ProducerAccessMode_Shared
case ProducerAccessModeExclusive:
return pb.ProducerAccessMode_Exclusive
+ case ProducerAccessModeWaitForExclusive:
+ return pb.ProducerAccessMode_WaitForExclusive
}
return pb.ProducerAccessMode_Shared
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index d7950eb..e69a14c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1737,3 +1737,45 @@ func TestExclusiveProducer(t *testing.T) {
assert.Error(t, err, "Producer should be failed")
assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
}
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ // Set the request timeout is 200ms
+ OperationTimeout: 200 * time.Millisecond,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ producer1, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ ProducerAccessMode: ProducerAccessModeExclusive,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer1)
+
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ producer2, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ ProducerAccessMode: ProducerAccessModeWaitForExclusive,
+ })
+ defer producer2.Close()
+ assert.NoError(t, err)
+ assert.NotNil(t, producer2)
+
+ id, err := producer2.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1024),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, id)
+ wg.Done()
+ }()
+ // Because set the request timeout is 200ms before.
+ // Here waite 300ms to cover wait for exclusive producer never timeout
+ time.Sleep(300 * time.Millisecond)
+ producer1.Close()
+ wg.Wait()
+}