You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/02/20 14:32:46 UTC

[pulsar-client-go] branch master updated: [feat] Support WaitForExclusive producer access mode. (#958)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3207f  [feat] Support WaitForExclusive producer access mode. (#958)
cf3207f is described below

commit cf3207f4637d80efbcbc0f7646ea326d9f9bac6b
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon Feb 20 22:32:39 2023 +0800

    [feat] Support WaitForExclusive producer access mode. (#958)
    
    * [feat] Support WaitForExclusive producer access mode.
    
    * Remove useless defer.
---
 pulsar/internal/connection.go | 17 +++++++++++++++--
 pulsar/internal/rpc_client.go | 22 ++++++++++++++++------
 pulsar/producer.go            |  3 +++
 pulsar/producer_partition.go  |  2 ++
 pulsar/producer_test.go       | 42 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index cb4af33..67b6f32 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -523,8 +523,14 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
 		c.handleResponse(cmd.Success.GetRequestId(), cmd)
 
 	case pb.BaseCommand_PRODUCER_SUCCESS:
-		c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
-
+		if !*cmd.ProducerSuccess.ProducerReady {
+			request, ok := c.findPendingRequest(cmd.ProducerSuccess.GetRequestId())
+			if ok {
+				request.callback(cmd, nil)
+			}
+		} else {
+			c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
+		}
 	case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
 		c.checkServerError(cmd.PartitionMetadataResponse.Error)
 		c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)
@@ -748,6 +754,13 @@ func (c *connection) deletePendingRequest(requestID uint64) (*request, bool) {
 	return request, ok
 }
 
+func (c *connection) findPendingRequest(requestID uint64) (*request, bool) {
+	c.pendingLock.Lock()
+	defer c.pendingLock.Unlock()
+	request, ok := c.pendingReqs[requestID]
+	return request, ok
+}
+
 func (c *connection) failPendingRequests(err error) bool {
 	c.pendingLock.Lock()
 	defer c.pendingLock.Unlock()
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 378ab4f..0ee8ca9 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 			Cnx:      cnx,
 			Response: response,
 		}, err}
-		close(ch)
 	})
 
-	select {
-	case res := <-ch:
-		return res.RPCResult, res.error
-	case <-time.After(c.requestTimeout):
-		return nil, ErrRequestTimeOut
+	timeoutCh := time.After(c.requestTimeout)
+	for {
+		select {
+		case res := <-ch:
+			// Ignoring producer not ready response.
+			// Continue to wait for the producer to create successfully
+			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
+				if !*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+					timeoutCh = nil
+					break
+				}
+			}
+			return res.RPCResult, res.error
+		case <-timeoutCh:
+			return nil, ErrRequestTimeOut
+		}
 	}
 }
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9ac34b..8fcb891 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -64,6 +64,9 @@ const (
 	// ProducerAccessModeExclusive is required exclusive access for producer.
 	// Fail immediately if there's already a producer connected.
 	ProducerAccessModeExclusive
+
+	// ProducerAccessModeWaitForExclusive is pending until producer can acquire exclusive access.
+	ProducerAccessModeWaitForExclusive
 )
 
 // TopicMetadata represents a topic metadata.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index eece055..160693c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1367,6 +1367,8 @@ func toProtoProducerAccessMode(accessMode ProducerAccessMode) pb.ProducerAccessM
 		return pb.ProducerAccessMode_Shared
 	case ProducerAccessModeExclusive:
 		return pb.ProducerAccessMode_Exclusive
+	case ProducerAccessModeWaitForExclusive:
+		return pb.ProducerAccessMode_WaitForExclusive
 	}
 
 	return pb.ProducerAccessMode_Shared
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index d7950eb..e69a14c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1737,3 +1737,45 @@ func TestExclusiveProducer(t *testing.T) {
 	assert.Error(t, err, "Producer should be failed")
 	assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
 }
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+		// Set the request timeout is 200ms
+		OperationTimeout: 200 * time.Millisecond,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer1)
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		producer2, err := client.CreateProducer(ProducerOptions{
+			Topic:              topicName,
+			ProducerAccessMode: ProducerAccessModeWaitForExclusive,
+		})
+		defer producer2.Close()
+		assert.NoError(t, err)
+		assert.NotNil(t, producer2)
+
+		id, err := producer2.Send(context.Background(), &ProducerMessage{
+			Payload: make([]byte, 1024),
+		})
+		assert.Nil(t, err)
+		assert.NotNil(t, id)
+		wg.Done()
+	}()
+	// Because set the request timeout is 200ms before.
+	// Here waite 300ms to cover wait for exclusive producer never timeout
+	time.Sleep(300 * time.Millisecond)
+	producer1.Close()
+	wg.Wait()
+}