You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 04:33:19 UTC
[pulsar-client-go] branch master updated: Support producer name for
Message (#299)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f7ca912 Support producer name for Message (#299)
f7ca912 is described below
commit f7ca912897cc7d1a04dbc1602a40552dd2b8f77a
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Fri Jun 26 12:33:09 2020 +0800
Support producer name for Message (#299)
* Support producer name for Message
Signed-off-by: xiaolong.ran <rx...@apache.org>
* add test case for producer name
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix ci error
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix comments
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix comments
Signed-off-by: xiaolong.ran <rx...@apache.org>
---
pulsar/consumer_partition.go | 2 ++
pulsar/consumer_test.go | 53 +++++++++++++++++++++++++++++++++++++++-----
pulsar/impl_message.go | 5 +++++
pulsar/message.go | 3 +++
pulsar/producer_test.go | 6 ++---
5 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index bb4045e..b4498f6 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -417,6 +417,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(smm.GetEventTime()),
key: smm.GetPartitionKey(),
+ producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(smm.GetProperties()),
topic: pc.topic,
msgID: msgID,
@@ -430,6 +431,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
+ producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: msgID,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3a06cd1..d598dd3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1275,14 +1275,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
assert.Equal(t, len(msgs), 10)
}
-func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
- assert := assert.New(t)
-
+func TestConsumerNegativeReceiverQueueSize(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
- assert.Nil(err)
+ assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
@@ -1297,5 +1295,50 @@ func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
}
}()
- assert.Nil(err)
+ assert.Nil(t, err)
+}
+
+func TestProducerName(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producerName := "test-producer-name"
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Name: producerName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // publish 10 messages to topic
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+
+ assert.Equal(t, msg.ProducerName(), producerName)
+ consumer.Ack(msg)
+ }
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9b1950..6aa4223 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -140,6 +140,7 @@ type message struct {
publishTime time.Time
eventTime time.Time
key string
+ producerName string
payLoad []byte
msgID MessageID
properties map[string]string
@@ -189,6 +190,10 @@ func (msg *message) GetReplicatedFrom() string {
return msg.replicatedFrom
}
+func (msg *message) ProducerName() string {
+ return msg.producerName
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index 6be35d2..5137836 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -64,6 +64,9 @@ type Message interface {
// Topic get the topic from which this message originated from
Topic() string
+ // ProducerName returns the name of the producer that has published the message.
+ ProducerName() string
+
// Properties are application defined key/value pairs that will be attached to the message.
// Return the properties attached to the message.
Properties() map[string]string
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2c028c2..95e4ed1 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) {
eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
ID, err := producer.Send(context.Background(), &ProducerMessage{
- Payload: []byte(fmt.Sprintf("test-event-time")),
+ Payload: []byte("test-event-time"),
EventTime: eventTime,
})
assert.Nil(t, err)
@@ -728,7 +728,7 @@ func TestDelayRelative(t *testing.T) {
defer consumer.Close()
ID, err := producer.Send(context.Background(), &ProducerMessage{
- Payload: []byte(fmt.Sprintf("test")),
+ Payload: []byte("test"),
DeliverAfter: 3 * time.Second,
})
assert.Nil(t, err)
@@ -771,7 +771,7 @@ func TestDelayAbsolute(t *testing.T) {
defer consumer.Close()
ID, err := producer.Send(context.Background(), &ProducerMessage{
- Payload: []byte(fmt.Sprintf("test")),
+ Payload: []byte("test"),
DeliverAt: time.Now().Add(3 * time.Second),
})
assert.Nil(t, err)