You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/17 15:14:35 UTC

[pulsar-client-go] branch master updated: Expose replicated from filed on message struct (#251)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e1c3822  Expose replicated from filed on message struct (#251)
e1c3822 is described below

commit e1c38224cd60bc7be197dc55b55f4b8149d035bd
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Sun May 17 23:14:25 2020 +0800

    Expose replicated from filed on message struct (#251)
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 pulsar/consumer_partition.go | 2 ++
 pulsar/impl_message.go       | 9 +++++++++
 pulsar/message.go            | 6 ++++++
 3 files changed, 17 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 498f2ae..4c40ae9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -424,6 +424,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				msgID:               msgID,
 				payLoad:             payload,
 				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
 			}
 		} else {
@@ -436,6 +437,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				msgID:               msgID,
 				payLoad:             payload,
 				replicationClusters: msgMeta.GetReplicateTo(),
+				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
 			}
 		}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 9b85c8a..3a5d4b6 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -145,6 +145,7 @@ type message struct {
 	properties          map[string]string
 	topic               string
 	replicationClusters []string
+	replicatedFrom      string
 	redeliveryCount     uint32
 }
 
@@ -180,6 +181,14 @@ func (msg *message) RedeliveryCount() uint32 {
 	return msg.redeliveryCount
 }
 
+func (msg *message) IsReplicated() bool {
+	return msg.replicatedFrom != ""
+}
+
+func (msg *message) GetReplicatedFrom() string {
+	return msg.replicatedFrom
+}
+
 func newAckTracker(size int) *ackTracker {
 	var batchIDs *big.Int
 	if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index 8505321..6be35d2 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -93,6 +93,12 @@ type Message interface {
 	// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
 	// redelivery count will be recalculated.
 	RedeliveryCount() uint32
+
+	// Check whether the message is replicated from other cluster.
+	IsReplicated() bool
+
+	// Get name of cluster, from which the message is replicated.
+	GetReplicatedFrom() string
 }
 
 // MessageID identifier for a particular message