You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/24 09:07:55 UTC
[pulsar-client-go] branch master updated: support
DisableReplication (#543)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f33b594 support DisableReplication (#543)
f33b594 is described below
commit f33b594034c9433faae78a144983ab3c8f785add
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Thu Jun 24 18:07:47 2021 +0900
support DisableReplication (#543)
Co-authored-by: Yuto Furuta <yf...@yahoo-corp.jp>
---
pulsar/message.go | 3 +++
pulsar/producer_partition.go | 5 +++++
2 files changed, 8 insertions(+)
diff --git a/pulsar/message.go b/pulsar/message.go
index 2a4343f..23dfefb 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -48,6 +48,9 @@ type ProducerMessage struct {
// ReplicationClusters override the replication clusters for this message.
ReplicationClusters []string
+ // Disable the replication for this message
+ DisableReplication bool
+
// SequenceID set the sequence id to assign to the current message
SequenceID *int64
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 19aa06e..7dd176c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -411,6 +411,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if !sendAsBatch {
p.internalFlushCurrentBatch()
}
+
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
msg.ReplicationClusters, deliverAt)
if !added {