You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/12/23 03:12:26 UTC
[pulsar-client-go] branch master updated: [Issue #401] Add
orderingKey apis (#427)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9b20cc0 [Issue #401] Add orderingKey apis (#427)
9b20cc0 is described below
commit 9b20cc0841602f3767d32b19edb1db55fd0bb209
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Dec 23 11:11:59 2020 +0800
[Issue #401] Add orderingKey apis (#427)
Fixes #401
### Motivation
According to apache/pulsar#4079, orderingKey was introduced to let user set message order manually, currently pulsar-client-go do not have related apis exposed to user. We should add orderingKey related apis to pulsar-client-go.
### Modifications
- add OrderingKey to ProducerMessage
- add OrderingKey() to Message interface
- sync OrderingKey to SingleMessageMetadata
- tests
---
go.mod | 3 +-
go.sum | 8 +--
pulsar/consumer_impl.go | 1 +
pulsar/consumer_test.go | 119 ++++++++++++++++++++++++++++++++++++++++++-
pulsar/default_router.go | 5 ++
pulsar/dlq_router.go | 1 +
pulsar/impl_message.go | 5 ++
pulsar/message.go | 6 +++
pulsar/producer_partition.go | 4 ++
9 files changed, 145 insertions(+), 7 deletions(-)
diff --git a/go.mod b/go.mod
index 817e223..a3727a0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,12 +10,11 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
+ github.com/google/uuid v1.1.2
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
- github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
- github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
diff --git a/go.sum b/go.sum
index 1bcef52..c89a434 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
-github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/99designs/keyring v1.1.6 h1:kVDC2uCgVwecxCk+9zoCt2uEL6dt+dfVzMvGgnVcIuM=
github.com/99designs/keyring v1.1.6/go.mod h1:16e0ds7LGQQcT59QqkTg72Hh5ShM51Byv5PEmW6uoRU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -32,8 +31,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
@@ -65,6 +63,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8456fd0..370ea12 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -459,6 +459,7 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
+ OrderingKey: msg.OrderingKey(),
Properties: props,
DeliverAfter: delay,
},
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6d58cd4..5b9b6e0 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
@@ -1877,5 +1878,121 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
receivedMessageIndex++
}
- // TODO: add OrderingKey support, see GH issue #401
+ // Test OrderingKey
+ for i := 0; i < MsgBatchCount; i++ {
+ for _, k := range keys {
+ u := uuid.New()
+ producer.SendAsync(ctx, &ProducerMessage{
+ Key: u.String(),
+ OrderingKey: k,
+ Payload: []byte(fmt.Sprintf("value-%d", i)),
+ }, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ assert.Nil(t, err)
+ },
+ )
+ }
+ }
+
+ receivedKey = ""
+ receivedMessageIndex = 0
+ for i := 0; i < len(keys)*MsgBatchCount; i++ {
+ cm, ok := <-consumer1.Chan()
+ if !ok {
+ break
+ }
+ if receivedKey != cm.OrderingKey() {
+ receivedKey = cm.OrderingKey()
+ receivedMessageIndex = 0
+ }
+ assert.Equal(
+ t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+ string(cm.Payload()),
+ )
+ consumer1.Ack(cm.Message)
+ receivedMessageIndex++
+ }
+
+}
+
+func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
+ client, err := NewClient(
+ ClientOptions{
+ URL: lookupURL,
+ },
+ )
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/test-key-shared-with-ordering-key"
+
+ consumer1, err := client.Subscribe(
+ ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ },
+ )
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(
+ ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ },
+ )
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(
+ ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ },
+ )
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ for i := 0; i < 100; i++ {
+ u := uuid.New()
+ _, err := producer.Send(
+ ctx, &ProducerMessage{
+ Key: u.String(),
+ OrderingKey: fmt.Sprintf("key-shared-%d", i%3),
+ Payload: []byte(fmt.Sprintf("value-%d", i)),
+ },
+ )
+ assert.Nil(t, err)
+ }
+
+ receivedConsumer1 := 0
+ receivedConsumer2 := 0
+ for (receivedConsumer1 + receivedConsumer2) < 100 {
+ select {
+ case cm, ok := <-consumer1.Chan():
+ if !ok {
+ break
+ }
+ receivedConsumer1++
+ consumer1.Ack(cm.Message)
+ case cm, ok := <-consumer2.Chan():
+ if !ok {
+ break
+ }
+ receivedConsumer2++
+ consumer2.Ack(cm.Message)
+ }
+ }
+
+ assert.NotEqual(t, 0, receivedConsumer1)
+ assert.NotEqual(t, 0, receivedConsumer2)
+
+ fmt.Printf(
+ "TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumser2: %d\n",
+ receivedConsumer1, receivedConsumer2,
+ )
+ assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index 0e1a354..b5e24a6 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -55,6 +55,11 @@ func NewDefaultRouter(
return 0
}
+ if len(message.OrderingKey) != 0 {
+ // When an OrderingKey is specified, use the hash of that key
+ return int(hashFunc(message.OrderingKey) % numPartitions)
+ }
+
if len(message.Key) != 0 {
// When a key is specified, use the hash of that key
return int(hashFunc(message.Key) % numPartitions)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index b4d98b0..40761e2 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -97,6 +97,7 @@ func (r *dlqRouter) run() {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
+ OrderingKey: msg.OrderingKey(),
Properties: msg.Properties(),
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 27cb0ed..6fc9cad 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -170,6 +170,7 @@ type message struct {
publishTime time.Time
eventTime time.Time
key string
+ orderingKey string
producerName string
payLoad []byte
msgID MessageID
@@ -209,6 +210,10 @@ func (msg *message) Key() string {
return msg.key
}
+func (msg *message) OrderingKey() string {
+ return msg.orderingKey
+}
+
func (msg *message) RedeliveryCount() uint32 {
return msg.redeliveryCount
}
diff --git a/pulsar/message.go b/pulsar/message.go
index a3b2257..397c51e 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -33,6 +33,9 @@ type ProducerMessage struct {
// Key sets the key of the message for routing policy
Key string
+ // OrderingKey sets the ordering key of the message
+ OrderingKey string
+
// Properties attach application defined properties on the message
Properties map[string]string
@@ -93,6 +96,9 @@ type Message interface {
// Key get the key of the message, if any
Key() string
+ // OrderingKey get the ordering key of the message, if any
+ OrderingKey() string
+
// Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
// broker will dispatch message again with message redelivery count in CommandMessage defined.
//
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 622c53e..f247e26 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -370,6 +370,10 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
smm.PartitionKey = proto.String(msg.Key)
}
+ if len(msg.OrderingKey) != 0 {
+ smm.OrderingKey = []byte(msg.OrderingKey)
+ }
+
if msg.Properties != nil {
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}