You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/02/09 03:04:45 UTC
[pulsar-client-go] branch master updated: [feat] Support Exclusive Producer access mode. (#944)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 93a5a76 [feat] Support Exclusive Producer access mode. (#944)
93a5a76 is described below
commit 93a5a765b9ff9d756d0adc5d2de30f31e4cbf29f
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Feb 9 11:04:39 2023 +0800
[feat] Support Exclusive Producer access mode. (#944)
* [feat] Support Exclusive Producer access mode.
* Use uint ptr instead int64
---
pulsar/producer.go | 17 +++++++++++++++++
pulsar/producer_partition.go | 19 +++++++++++++++++++
pulsar/producer_test.go | 32 ++++++++++++++++++++++++++++++++
3 files changed, 68 insertions(+)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d088fb2..d9ac34b 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -55,6 +55,17 @@ const (
Better
)
+type ProducerAccessMode int
+
+const (
+ // ProducerAccessModeShared is default multiple producers can publish on a topic.
+ ProducerAccessModeShared ProducerAccessMode = iota
+
+ // ProducerAccessModeExclusive is required exclusive access for producer.
+ // Fail immediately if there's already a producer connected.
+ ProducerAccessModeExclusive
+)
+
// TopicMetadata represents a topic metadata.
type TopicMetadata interface {
// NumPartitions returns the number of partitions for a particular topic.
@@ -187,6 +198,12 @@ type ProducerOptions struct {
// ChunkMaxMessageSize is the max size of single chunk payload.
// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
ChunkMaxMessageSize uint
+
+ // The type of access to the topic that the producer requires. (default ProducerAccessModeShared)
+ // Options:
+ // - ProducerAccessModeShared
+ // - ProducerAccessModeExclusive
+ ProducerAccessMode
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b0467f5..eece055 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -95,6 +95,7 @@ type partitionProducer struct {
metrics *internal.LeveledMetrics
epoch uint64
schemaCache *schemaCache
+ topicEpoch *uint64
}
type schemaCache struct {
@@ -237,6 +238,11 @@ func (p *partitionProducer) grabCnx() error {
Schema: pbSchema,
Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
+ ProducerAccessMode: toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
+ }
+
+ if p.topicEpoch != nil {
+ cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
}
if p.producerName != "" {
@@ -253,6 +259,8 @@ func (p *partitionProducer) grabCnx() error {
}
p.producerName = res.Response.ProducerSuccess.GetProducerName()
+ nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
+ p.topicEpoch = &nextTopicEpoch
if p.options.Encryption != nil {
p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
@@ -1352,3 +1360,14 @@ func (c *chunkRecorder) setFirstChunkID(msgID messageID) {
func (c *chunkRecorder) setLastChunkID(msgID messageID) {
c.chunkedMsgID.messageID = msgID
}
+
+func toProtoProducerAccessMode(accessMode ProducerAccessMode) pb.ProducerAccessMode {
+ switch accessMode {
+ case ProducerAccessModeShared:
+ return pb.ProducerAccessMode_Shared
+ case ProducerAccessModeExclusive:
+ return pb.ProducerAccessMode_Exclusive
+ }
+
+ return pb.ProducerAccessMode_Shared
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 75fc6db..d7950eb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -1705,3 +1706,34 @@ func TestProducerWithSchemaAndConsumerSchemaNotFound(t *testing.T) {
// should fail with error but not panic
assert.Error(t, err)
}
+
+func TestExclusiveProducer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ ProducerAccessMode: ProducerAccessModeExclusive,
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ _, err = client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ ProducerAccessMode: ProducerAccessModeExclusive,
+ })
+ assert.Error(t, err, "Producer should be fenced")
+ assert.True(t, strings.Contains(err.Error(), "ProducerFenced"))
+
+ _, err = client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Error(t, err, "Producer should be failed")
+ assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
+}