You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/17 15:14:35 UTC
[pulsar-client-go] branch master updated: Expose replicated from
filed on message struct (#251)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e1c3822 Expose replicated from filed on message struct (#251)
e1c3822 is described below
commit e1c38224cd60bc7be197dc55b55f4b8149d035bd
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Sun May 17 23:14:25 2020 +0800
Expose replicated from filed on message struct (#251)
Signed-off-by: xiaolong.ran <rx...@apache.org>
---
pulsar/consumer_partition.go | 2 ++
pulsar/impl_message.go | 9 +++++++++
pulsar/message.go | 6 ++++++
3 files changed, 17 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 498f2ae..4c40ae9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -424,6 +424,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
msgID: msgID,
payLoad: payload,
replicationClusters: msgMeta.GetReplicateTo(),
+ replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
}
} else {
@@ -436,6 +437,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
msgID: msgID,
payLoad: payload,
replicationClusters: msgMeta.GetReplicateTo(),
+ replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
}
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 9b85c8a..3a5d4b6 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -145,6 +145,7 @@ type message struct {
properties map[string]string
topic string
replicationClusters []string
+ replicatedFrom string
redeliveryCount uint32
}
@@ -180,6 +181,14 @@ func (msg *message) RedeliveryCount() uint32 {
return msg.redeliveryCount
}
+func (msg *message) IsReplicated() bool {
+ return msg.replicatedFrom != ""
+}
+
+func (msg *message) GetReplicatedFrom() string {
+ return msg.replicatedFrom
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index 8505321..6be35d2 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -93,6 +93,12 @@ type Message interface {
// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
// redelivery count will be recalculated.
RedeliveryCount() uint32
+
+ // Check whether the message is replicated from other cluster.
+ IsReplicated() bool
+
+ // Get name of cluster, from which the message is replicated.
+ GetReplicatedFrom() string
}
// MessageID identifier for a particular message