You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 04:33:19 UTC

[pulsar-client-go] branch master updated: Support producer name for Message (#299)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f7ca912  Support producer name for Message (#299)
f7ca912 is described below

commit f7ca912897cc7d1a04dbc1602a40552dd2b8f77a
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Fri Jun 26 12:33:09 2020 +0800

    Support producer name for Message (#299)
    
    * Support producer name for Message
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * add test case for producer name
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 pulsar/consumer_partition.go |  2 ++
 pulsar/consumer_test.go      | 53 +++++++++++++++++++++++++++++++++++++++-----
 pulsar/impl_message.go       |  5 +++++
 pulsar/message.go            |  3 +++
 pulsar/producer_test.go      |  6 ++---
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index bb4045e..b4498f6 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -417,6 +417,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				publishTime:         timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
 				eventTime:           timeFromUnixTimestampMillis(smm.GetEventTime()),
 				key:                 smm.GetPartitionKey(),
+				producerName:        msgMeta.GetProducerName(),
 				properties:          internal.ConvertToStringMap(smm.GetProperties()),
 				topic:               pc.topic,
 				msgID:               msgID,
@@ -430,6 +431,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				publishTime:         timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
 				eventTime:           timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
 				key:                 msgMeta.GetPartitionKey(),
+				producerName:        msgMeta.GetProducerName(),
 				properties:          internal.ConvertToStringMap(msgMeta.GetProperties()),
 				topic:               pc.topic,
 				msgID:               msgID,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3a06cd1..d598dd3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1275,14 +1275,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
 	assert.Equal(t, len(msgs), 10)
 }
 
-func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
-	assert := assert.New(t)
-
+func TestConsumerNegativeReceiverQueueSize(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
 
-	assert.Nil(err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	topic := newTopicName()
@@ -1297,5 +1295,50 @@ func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
 		}
 	}()
 
-	assert.Nil(err)
+	assert.Nil(t, err)
+}
+
+func TestProducerName(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	producerName := "test-producer-name"
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+		Name:  producerName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+	})
+
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// publish 10 messages to topic
+	ctx := context.Background()
+	for i := 0; i < 10; i++ {
+		_, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.Nil(t, err)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+
+		assert.Equal(t, msg.ProducerName(), producerName)
+		consumer.Ack(msg)
+	}
 }
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9b1950..6aa4223 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -140,6 +140,7 @@ type message struct {
 	publishTime         time.Time
 	eventTime           time.Time
 	key                 string
+	producerName        string
 	payLoad             []byte
 	msgID               MessageID
 	properties          map[string]string
@@ -189,6 +190,10 @@ func (msg *message) GetReplicatedFrom() string {
 	return msg.replicatedFrom
 }
 
+func (msg *message) ProducerName() string {
+	return msg.producerName
+}
+
 func newAckTracker(size int) *ackTracker {
 	var batchIDs *big.Int
 	if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index 6be35d2..5137836 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -64,6 +64,9 @@ type Message interface {
 	// Topic get the topic from which this message originated from
 	Topic() string
 
+	// ProducerName returns the name of the producer that has published the message.
+	ProducerName() string
+
 	// Properties are application defined key/value pairs that will be attached to the message.
 	// Return the properties attached to the message.
 	Properties() map[string]string
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2c028c2..95e4ed1 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) {
 
 	eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
 	ID, err := producer.Send(context.Background(), &ProducerMessage{
-		Payload:   []byte(fmt.Sprintf("test-event-time")),
+		Payload:   []byte("test-event-time"),
 		EventTime: eventTime,
 	})
 	assert.Nil(t, err)
@@ -728,7 +728,7 @@ func TestDelayRelative(t *testing.T) {
 	defer consumer.Close()
 
 	ID, err := producer.Send(context.Background(), &ProducerMessage{
-		Payload:      []byte(fmt.Sprintf("test")),
+		Payload:      []byte("test"),
 		DeliverAfter: 3 * time.Second,
 	})
 	assert.Nil(t, err)
@@ -771,7 +771,7 @@ func TestDelayAbsolute(t *testing.T) {
 	defer consumer.Close()
 
 	ID, err := producer.Send(context.Background(), &ProducerMessage{
-		Payload:   []byte(fmt.Sprintf("test")),
+		Payload:   []byte("test"),
 		DeliverAt: time.Now().Add(3 * time.Second),
 	})
 	assert.Nil(t, err)