You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/07 13:05:37 UTC

[pulsar-client-go] branch master updated: [Issue:45] Fix the logic of message router (#46)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new dffa055  [Issue:45] Fix the logic of message router (#46)
dffa055 is described below

commit dffa0553782af2f0f9dcfa37291faf298f731483
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Wed Aug 7 21:05:32 2019 +0800

    [Issue:45] Fix the logic of message router (#46)
    
    * [Issue:45] Fix the logic of message router
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * add unit test case for TopicPartitions
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 pulsar/impl_client_test.go | 32 +++++++++++++++++++++++++++++++
 pulsar/impl_producer.go    |  2 ++
 pulsar/producer.go         |  2 +-
 pulsar/producer_test.go    | 47 ++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/test_helper.go      | 23 +++++++++++++++++++++++
 5 files changed, 105 insertions(+), 1 deletion(-)

diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index ec3110a..7f6c4fc 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"fmt"
 	"io/ioutil"
 	"testing"
 
@@ -199,3 +200,34 @@ func TestTokenAuthFromFile(t *testing.T) {
 	err = client.Close()
 	assert.NoError(t, err)
 }
+
+func TestTopicPartitions(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// Create topic with 5 partitions
+	httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
+		5)
+
+	partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
+
+	partitions, err := client.TopicPartitions(partitionedTopic)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 5)
+	for i := 0; i < 5; i++ {
+		assert.Equal(t, partitions[i],
+			fmt.Sprintf("%s-partition-%d", partitionedTopic, i))
+	}
+
+	// Non-Partitioned topic
+	topic := "persistent://public/default/TestGetTopicPartitions-nopartitions"
+
+	partitions, err = client.TopicPartitions(topic)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 1)
+	assert.Equal(t, partitions[0], topic)
+}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 75ed9c5..169d844 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -57,6 +57,8 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
 			return internalRouter(message.Key, metadata.NumPartitions())
 		}
+	} else {
+		p.messageRouter = options.MessageRouter
 	}
 
 	partitions, err := client.TopicPartitions(options.Topic)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index c77a047..ff27d3a 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -107,7 +107,7 @@ type ProducerOptions struct {
 	// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
 	// The router is a function that given a particular message and the topic metadata, returns the
 	// partition index where the message should be routed to
-	MessageRouter func(Message, TopicMetadata) int
+	MessageRouter func(*ProducerMessage, TopicMetadata) int
 
 	// DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching
 	// is enabled.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0b17b37..66683d0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"testing"
 	"time"
@@ -179,3 +180,49 @@ func TestProducerLastSequenceID(t *testing.T) {
 	err = client.Close()
 	assert.NoError(t, err)
 }
+
+func TestMessageRouter(t *testing.T) {
+	// Create topic with 5 partitions
+	httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// Only subscribe on the specific partition
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            "my-partitioned-topic-partition-2",
+		SubscriptionName: "my-sub",
+	})
+
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: "my-partitioned-topic",
+		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
+			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
+			return 2
+		},
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+
+	err = producer.Send(ctx, &ProducerMessage{
+		Payload: []byte("hello"),
+	})
+	assert.Nil(t, err)
+
+	fmt.Println("PUBLISHED")
+
+	// Verify message was published on partition 2
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, string(msg.Payload()), "hello")
+}
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index c58c239..1329be1 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -18,7 +18,11 @@
 package pulsar
 
 import (
+	"bytes"
+	"encoding/json"
 	"fmt"
+	"log"
+	"net/http"
 	"time"
 )
 
@@ -39,3 +43,22 @@ func newTopicName() string {
 func newAuthTopicName() string {
 	return fmt.Sprintf("private/auth/my-topic-%v", time.Now().Nanosecond())
 }
+
+func httpPut(url string, body interface{}) {
+	client := http.Client{}
+
+	data, _ := json.Marshal(body)
+	req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(data))
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	req.Header = map[string][]string{
+		"Content-Type": {"application/json"},
+	}
+
+	_, err = client.Do(req)
+	if err != nil {
+		log.Fatal(err)
+	}
+}