You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/12/23 03:12:26 UTC

[pulsar-client-go] branch master updated: [Issue #401] Add orderingKey apis (#427)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b20cc0  [Issue #401] Add orderingKey apis (#427)
9b20cc0 is described below

commit 9b20cc0841602f3767d32b19edb1db55fd0bb209
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Dec 23 11:11:59 2020 +0800

    [Issue #401] Add orderingKey apis (#427)
    
    Fixes #401
    
    ### Motivation
    According to apache/pulsar#4079, orderingKey was introduced to let user set message order manually, currently pulsar-client-go do not have related apis exposed to user. We should add orderingKey related apis to pulsar-client-go.
    
    ### Modifications
    
    - add OrderingKey to ProducerMessage
    - add OrderingKey() to Message interface
    - sync OrderingKey to SingleMessageMetadata
    - tests
---
 go.mod                       |   3 +-
 go.sum                       |   8 +--
 pulsar/consumer_impl.go      |   1 +
 pulsar/consumer_test.go      | 119 ++++++++++++++++++++++++++++++++++++++++++-
 pulsar/default_router.go     |   5 ++
 pulsar/dlq_router.go         |   1 +
 pulsar/impl_message.go       |   5 ++
 pulsar/message.go            |   6 +++
 pulsar/producer_partition.go |   4 ++
 9 files changed, 145 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod
index 817e223..a3727a0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,12 +10,11 @@ require (
 	github.com/davecgh/go-spew v1.1.1
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.4.2
+	github.com/google/uuid v1.1.2
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/compress v1.10.8
 	github.com/kr/pretty v0.2.0 // indirect
 	github.com/linkedin/goavro/v2 v2.9.8
-	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
-	github.com/modern-go/reflect2 v1.0.1 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.7.1
diff --git a/go.sum b/go.sum
index 1bcef52..c89a434 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,5 @@
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
-github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/99designs/keyring v1.1.6 h1:kVDC2uCgVwecxCk+9zoCt2uEL6dt+dfVzMvGgnVcIuM=
 github.com/99designs/keyring v1.1.6/go.mod h1:16e0ds7LGQQcT59QqkTg72Hh5ShM51Byv5PEmW6uoRU=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -32,8 +31,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
@@ -65,6 +63,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8456fd0..370ea12 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -459,6 +459,7 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
 			producerMsg: ProducerMessage{
 				Payload:      msg.Payload(),
 				Key:          msg.Key(),
+				OrderingKey:  msg.OrderingKey(),
 				Properties:   props,
 				DeliverAfter: delay,
 			},
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6d58cd4..5b9b6e0 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -28,6 +28,7 @@ import (
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
+	"github.com/google/uuid"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -1877,5 +1878,121 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
 		receivedMessageIndex++
 	}
 
-	// TODO: add OrderingKey support, see GH issue #401
+	// Test OrderingKey
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			u := uuid.New()
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:         u.String(),
+				OrderingKey: k,
+				Payload:     []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	receivedKey = ""
+	receivedMessageIndex = 0
+	for i := 0; i < len(keys)*MsgBatchCount; i++ {
+		cm, ok := <-consumer1.Chan()
+		if !ok {
+			break
+		}
+		if receivedKey != cm.OrderingKey() {
+			receivedKey = cm.OrderingKey()
+			receivedMessageIndex = 0
+		}
+		assert.Equal(
+			t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+			string(cm.Payload()),
+		)
+		consumer1.Ack(cm.Message)
+		receivedMessageIndex++
+	}
+
+}
+
+func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
+	client, err := NewClient(
+		ClientOptions{
+			URL: lookupURL,
+		},
+	)
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-key-shared-with-ordering-key"
+
+	consumer1, err := client.Subscribe(
+		ConsumerOptions{
+			Topic:            topic,
+			SubscriptionName: "sub-1",
+			Type:             KeyShared,
+		},
+	)
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(
+		ConsumerOptions{
+			Topic:            topic,
+			SubscriptionName: "sub-1",
+			Type:             KeyShared,
+		},
+	)
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(
+		ProducerOptions{
+			Topic:           topic,
+			DisableBatching: true,
+		},
+	)
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	for i := 0; i < 100; i++ {
+		u := uuid.New()
+		_, err := producer.Send(
+			ctx, &ProducerMessage{
+				Key:         u.String(),
+				OrderingKey: fmt.Sprintf("key-shared-%d", i%3),
+				Payload:     []byte(fmt.Sprintf("value-%d", i)),
+			},
+		)
+		assert.Nil(t, err)
+	}
+
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	for (receivedConsumer1 + receivedConsumer2) < 100 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer1++
+			consumer1.Ack(cm.Message)
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			consumer2.Ack(cm.Message)
+		}
+	}
+
+	assert.NotEqual(t, 0, receivedConsumer1)
+	assert.NotEqual(t, 0, receivedConsumer2)
+
+	fmt.Printf(
+		"TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2,
+	)
+	assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index 0e1a354..b5e24a6 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -55,6 +55,11 @@ func NewDefaultRouter(
 			return 0
 		}
 
+		if len(message.OrderingKey) != 0 {
+			// When an OrderingKey is specified, use the hash of that key
+			return int(hashFunc(message.OrderingKey) % numPartitions)
+		}
+
 		if len(message.Key) != 0 {
 			// When a key is specified, use the hash of that key
 			return int(hashFunc(message.Key) % numPartitions)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index b4d98b0..40761e2 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -97,6 +97,7 @@ func (r *dlqRouter) run() {
 			producer.SendAsync(context.Background(), &ProducerMessage{
 				Payload:             msg.Payload(),
 				Key:                 msg.Key(),
+				OrderingKey:         msg.OrderingKey(),
 				Properties:          msg.Properties(),
 				EventTime:           msg.EventTime(),
 				ReplicationClusters: msg.replicationClusters,
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 27cb0ed..6fc9cad 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -170,6 +170,7 @@ type message struct {
 	publishTime         time.Time
 	eventTime           time.Time
 	key                 string
+	orderingKey         string
 	producerName        string
 	payLoad             []byte
 	msgID               MessageID
@@ -209,6 +210,10 @@ func (msg *message) Key() string {
 	return msg.key
 }
 
+func (msg *message) OrderingKey() string {
+	return msg.orderingKey
+}
+
 func (msg *message) RedeliveryCount() uint32 {
 	return msg.redeliveryCount
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index a3b2257..397c51e 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -33,6 +33,9 @@ type ProducerMessage struct {
 	// Key sets the key of the message for routing policy
 	Key string
 
+	// OrderingKey sets the ordering key of the message
+	OrderingKey string
+
 	// Properties attach application defined properties on the message
 	Properties map[string]string
 
@@ -93,6 +96,9 @@ type Message interface {
 	// Key get the key of the message, if any
 	Key() string
 
+	// OrderingKey get the ordering key of the message, if any
+	OrderingKey() string
+
 	// Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
 	// broker will dispatch message again with message redelivery count in CommandMessage defined.
 	//
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 622c53e..f247e26 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -370,6 +370,10 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		smm.PartitionKey = proto.String(msg.Key)
 	}
 
+	if len(msg.OrderingKey) != 0 {
+		smm.OrderingKey = []byte(msg.OrderingKey)
+	}
+
 	if msg.Properties != nil {
 		smm.Properties = internal.ConvertFromStringMap(msg.Properties)
 	}