You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/02/09 03:04:45 UTC

[pulsar-client-go] branch master updated: [feat] Support Exclusive Producer access mode. (#944)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 93a5a76  [feat] Support Exclusive Producer access mode. (#944)
93a5a76 is described below

commit 93a5a765b9ff9d756d0adc5d2de30f31e4cbf29f
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Feb 9 11:04:39 2023 +0800

    [feat] Support Exclusive Producer access mode. (#944)
    
    * [feat] Support Exclusive Producer access mode.
    
    * Use uint ptr instead int64
---
 pulsar/producer.go           | 17 +++++++++++++++++
 pulsar/producer_partition.go | 19 +++++++++++++++++++
 pulsar/producer_test.go      | 32 ++++++++++++++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git a/pulsar/producer.go b/pulsar/producer.go
index d088fb2..d9ac34b 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -55,6 +55,17 @@ const (
 	Better
 )
 
+type ProducerAccessMode int
+
+const (
+	// ProducerAccessModeShared is default multiple producers can publish on a topic.
+	ProducerAccessModeShared ProducerAccessMode = iota
+
+	// ProducerAccessModeExclusive is required exclusive access for producer.
+	// Fail immediately if there's already a producer connected.
+	ProducerAccessModeExclusive
+)
+
 // TopicMetadata represents a topic metadata.
 type TopicMetadata interface {
 	// NumPartitions returns the number of partitions for a particular topic.
@@ -187,6 +198,12 @@ type ProducerOptions struct {
 	// ChunkMaxMessageSize is the max size of single chunk payload.
 	// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
 	ChunkMaxMessageSize uint
+
+	// The type of access to the topic that the producer requires. (default ProducerAccessModeShared)
+	// Options:
+	// - ProducerAccessModeShared
+	// - ProducerAccessModeExclusive
+	ProducerAccessMode
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b0467f5..eece055 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -95,6 +95,7 @@ type partitionProducer struct {
 	metrics          *internal.LeveledMetrics
 	epoch            uint64
 	schemaCache      *schemaCache
+	topicEpoch       *uint64
 }
 
 type schemaCache struct {
@@ -237,6 +238,11 @@ func (p *partitionProducer) grabCnx() error {
 		Schema:                   pbSchema,
 		Epoch:                    proto.Uint64(atomic.LoadUint64(&p.epoch)),
 		UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
+		ProducerAccessMode:       toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
+	}
+
+	if p.topicEpoch != nil {
+		cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
 	}
 
 	if p.producerName != "" {
@@ -253,6 +259,8 @@ func (p *partitionProducer) grabCnx() error {
 	}
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
+	nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
+	p.topicEpoch = &nextTopicEpoch
 
 	if p.options.Encryption != nil {
 		p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
@@ -1352,3 +1360,14 @@ func (c *chunkRecorder) setFirstChunkID(msgID messageID) {
 func (c *chunkRecorder) setLastChunkID(msgID messageID) {
 	c.chunkedMsgID.messageID = msgID
 }
+
+func toProtoProducerAccessMode(accessMode ProducerAccessMode) pb.ProducerAccessMode {
+	switch accessMode {
+	case ProducerAccessModeShared:
+		return pb.ProducerAccessMode_Shared
+	case ProducerAccessModeExclusive:
+		return pb.ProducerAccessMode_Exclusive
+	}
+
+	return pb.ProducerAccessMode_Shared
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 75fc6db..d7950eb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -23,6 +23,7 @@ import (
 	"fmt"
 	"net/http"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -1705,3 +1706,34 @@ func TestProducerWithSchemaAndConsumerSchemaNotFound(t *testing.T) {
 	// should fail with error but not panic
 	assert.Error(t, err)
 }
+
+func TestExclusiveProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	_, err = client.CreateProducer(ProducerOptions{
+		Topic:              topicName,
+		ProducerAccessMode: ProducerAccessModeExclusive,
+	})
+	assert.Error(t, err, "Producer should be fenced")
+	assert.True(t, strings.Contains(err.Error(), "ProducerFenced"))
+
+	_, err = client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Error(t, err, "Producer should be failed")
+	assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
+}