You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/07 13:05:37 UTC
[pulsar-client-go] branch master updated: [Issue:45] Fix the logic
of message router (#46)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new dffa055 [Issue:45] Fix the logic of message router (#46)
dffa055 is described below
commit dffa0553782af2f0f9dcfa37291faf298f731483
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Wed Aug 7 21:05:32 2019 +0800
[Issue:45] Fix the logic of message router (#46)
* [Issue:45] Fix the logic of message router
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* add unit test case for TopicPartitions
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
pulsar/impl_client_test.go | 32 +++++++++++++++++++++++++++++++
pulsar/impl_producer.go | 2 ++
pulsar/producer.go | 2 +-
pulsar/producer_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++
pulsar/test_helper.go | 23 +++++++++++++++++++++++
5 files changed, 105 insertions(+), 1 deletion(-)
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index ec3110a..7f6c4fc 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "fmt"
"io/ioutil"
"testing"
@@ -199,3 +200,34 @@ func TestTokenAuthFromFile(t *testing.T) {
err = client.Close()
assert.NoError(t, err)
}
+
+func TestTopicPartitions(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // Create topic with 5 partitions
+ httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
+ 5)
+
+ partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
+
+ partitions, err := client.TopicPartitions(partitionedTopic)
+ assert.Nil(t, err)
+ assert.Equal(t, len(partitions), 5)
+ for i := 0; i < 5; i++ {
+ assert.Equal(t, partitions[i],
+ fmt.Sprintf("%s-partition-%d", partitionedTopic, i))
+ }
+
+ // Non-Partitioned topic
+ topic := "persistent://public/default/TestGetTopicPartitions-nopartitions"
+
+ partitions, err = client.TopicPartitions(topic)
+ assert.Nil(t, err)
+ assert.Equal(t, len(partitions), 1)
+ assert.Equal(t, partitions[0], topic)
+}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 75ed9c5..169d844 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -57,6 +57,8 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message.Key, metadata.NumPartitions())
}
+ } else {
+ p.messageRouter = options.MessageRouter
}
partitions, err := client.TopicPartitions(options.Topic)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index c77a047..ff27d3a 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -107,7 +107,7 @@ type ProducerOptions struct {
// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
- MessageRouter func(Message, TopicMetadata) int
+ MessageRouter func(*ProducerMessage, TopicMetadata) int
// DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching
// is enabled.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0b17b37..66683d0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "fmt"
"sync"
"testing"
"time"
@@ -179,3 +180,49 @@ func TestProducerLastSequenceID(t *testing.T) {
err = client.Close()
assert.NoError(t, err)
}
+
+func TestMessageRouter(t *testing.T) {
+ // Create topic with 5 partitions
+ httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // Only subscribe on the specific partition
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-partitioned-topic-partition-2",
+ SubscriptionName: "my-sub",
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: "my-partitioned-topic",
+ MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
+ fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
+ return 2
+ },
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.Nil(t, err)
+
+ fmt.Println("PUBLISHED")
+
+ // Verify message was published on partition 2
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Equal(t, string(msg.Payload()), "hello")
+}
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index c58c239..1329be1 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -18,7 +18,11 @@
package pulsar
import (
+ "bytes"
+ "encoding/json"
"fmt"
+ "log"
+ "net/http"
"time"
)
@@ -39,3 +43,22 @@ func newTopicName() string {
func newAuthTopicName() string {
return fmt.Sprintf("private/auth/my-topic-%v", time.Now().Nanosecond())
}
+
+func httpPut(url string, body interface{}) {
+ client := http.Client{}
+
+ data, _ := json.Marshal(body)
+ req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(data))
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ req.Header = map[string][]string{
+ "Content-Type": {"application/json"},
+ }
+
+ _, err = client.Do(req)
+ if err != nil {
+ log.Fatal(err)
+ }
+}