You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/24 09:07:55 UTC

[pulsar-client-go] branch master updated: support DisableReplication (#543)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f33b594  support DisableReplication (#543)
f33b594 is described below

commit f33b594034c9433faae78a144983ab3c8f785add
Author: Yuto Furuta <mz...@gmail.com>
AuthorDate: Thu Jun 24 18:07:47 2021 +0900

    support DisableReplication (#543)
    
    Co-authored-by: Yuto Furuta <yf...@yahoo-corp.jp>
---
 pulsar/message.go            | 3 +++
 pulsar/producer_partition.go | 5 +++++
 2 files changed, 8 insertions(+)

diff --git a/pulsar/message.go b/pulsar/message.go
index 2a4343f..23dfefb 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -48,6 +48,9 @@ type ProducerMessage struct {
 	// ReplicationClusters override the replication clusters for this message.
 	ReplicationClusters []string
 
+	// Disable the replication for this message
+	DisableReplication bool
+
 	// SequenceID set the sequence id to assign to the current message
 	SequenceID *int64
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 19aa06e..7dd176c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -411,6 +411,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	if !sendAsBatch {
 		p.internalFlushCurrentBatch()
 	}
+
+	if msg.DisableReplication {
+		msg.ReplicationClusters = []string{"__local__"}
+	}
+
 	added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
 		msg.ReplicationClusters, deliverAt)
 	if !added {