You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/03/11 07:50:17 UTC

[pulsar-client-go] 01/01: [PIP 90] go client retrieve broker metadata

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch exposing-broker-metadata
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 5acad7b37030f2c675e9eeee846fc613b12a1fcf
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Fri Mar 11 15:46:59 2022 +0800

    [PIP 90] go client retrieve broker metadata
---
 examples/consumer/consumer.go    |  1 -
 pulsar/consumer_partition.go     | 15 +++++++++++++--
 pulsar/impl_message.go           |  5 +++++
 pulsar/internal/buffer.go        |  6 ++++++
 pulsar/internal/commands.go      | 20 ++++++++++++++++++--
 pulsar/internal/commands_test.go | 20 ++++++++++++++++++++
 pulsar/internal/connection.go    |  5 +++--
 pulsar/message.go                |  2 ++
 8 files changed, 67 insertions(+), 7 deletions(-)

diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 0b819bc..857a891 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -48,7 +48,6 @@ func main() {
 		if err != nil {
 			log.Fatal(err)
 		}
-
 		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
 			msg.ID(), string(msg.Payload()))
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a39c5..ea1ae4e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 	pbMsgID := response.GetMessageId()
 
 	reader := internal.NewMessageReader(headersAndPayload)
+	brokerMetadata, err := reader.ReadBrokerMetadata()
+	if err != nil {
+		// todo optimize use more appropriate error codes
+		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
+		return err
+	}
 	msgMeta, err := reader.ReadMessageMetadata()
 	if err != nil {
 		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
 		return err
 	}
-
 	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
 	// error decrypting the payload
 	if err != nil {
@@ -597,7 +602,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 			pc.AckID(msgID)
 			continue
 		}
-
+		var messageIndex *uint64
+		if brokerMetadata != nil {
+			aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1
+			messageIndex = &aux
+		}
 		// set the consumer so we know how to ack the message id
 		msgID.consumer = pc
 		var msg *message
@@ -616,6 +625,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
 				orderingKey:         string(smm.OrderingKey),
+				index:               messageIndex,
 			}
 		} else {
 			msg = &message{
@@ -631,6 +641,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
 				replicationClusters: msgMeta.GetReplicateTo(),
 				replicatedFrom:      msgMeta.GetReplicatedFrom(),
 				redeliveryCount:     response.GetRedeliveryCount(),
+				index:               messageIndex,
 			}
 		}
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9809aa..d4e3025 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -241,6 +241,7 @@ type message struct {
 	redeliveryCount     uint32
 	schema              Schema
 	encryptionContext   *EncryptionContext
+	index               *uint64
 }
 
 func (msg *message) Topic() string {
@@ -299,6 +300,10 @@ func (msg *message) GetEncryptionContext() *EncryptionContext {
 	return msg.encryptionContext
 }
 
+func (msg *message) Index() *uint64 {
+	return msg.index
+}
+
 func newAckTracker(size int) *ackTracker {
 	var batchIDs *big.Int
 	if size <= 64 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index f3b8fe6..b3e23fb 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -38,6 +38,8 @@ type Buffer interface {
 
 	Read(size uint32) []byte
 
+	Skip(size uint32)
+
 	Get(readerIndex uint32, size uint32) []byte
 
 	ReadableSlice() []byte
@@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte {
 	return res
 }
 
+func (b *buffer) Skip(size uint32) {
+	b.readerIdx += size
+}
+
 func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
 	return b.data[readerIdx : readerIdx+size]
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b91c0b6..7fd1885 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+	"encoding/binary"
 	"errors"
 	"fmt"
 
@@ -34,8 +35,9 @@ const (
 	// MessageFramePadding is for metadata and other frame headers
 	MessageFramePadding = 10 * 1024
 	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-	MaxFrameSize        = MaxMessageSize + MessageFramePadding
-	magicCrc32c  uint16 = 0x0e01
+	MaxFrameSize                    = MaxMessageSize + MessageFramePadding
+	magicCrc32c              uint16 = 0x0e01
+	magicBrokerEntryMetadata uint16 = 0x0e02
 )
 
 // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
@@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
 	return &meta, nil
 }
 
+func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
+	magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2))
+	if magicNumber != magicBrokerEntryMetadata {
+		return nil, nil
+	}
+	r.buffer.Skip(2)
+	size := r.buffer.ReadUint32()
+	var brokerEntryMetadata pb.BrokerEntryMetadata
+	if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); err != nil {
+		return nil, err
+	}
+	return &brokerEntryMetadata, nil
+}
+
 func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
 	if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
 		return nil, nil, ErrEOM
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index b43335a..c290c2c 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+	"fmt"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
@@ -70,6 +71,20 @@ func TestReadMessageMetadata(t *testing.T) {
 	assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
 }
 
+func TestReadBrokerEntryMetadata(t *testing.T) {
+	// read old style message (not batched)
+	reader := NewMessageReaderFromArray(brokerEntryMeta)
+	meta, err := reader.ReadBrokerMetadata()
+	if err != nil {
+		t.Fatal(err)
+	}
+	var expectedBrokerTimestamp uint64 = 1646983036054
+	assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp)
+	var expectedIndex uint64 = 5
+	assert.Equal(t, expectedIndex, *meta.Index)
+	fmt.Println(meta)
+}
+
 func TestReadMessageOldFormat(t *testing.T) {
 	reader := NewMessageReaderFromArray(rawCompatSingleMessage)
 	_, err := reader.ReadMessageMetadata()
@@ -210,3 +225,8 @@ var rawBatchMessage10 = []byte{
 	0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
 	0x6f,
 }
+
+var brokerEntryMeta = []byte{
+	0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96,
+	0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05,
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e2009c..a025abf 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -43,7 +43,7 @@ const (
 	PulsarVersion       = "0.1"
 	ClientVersionString = "Pulsar Go " + PulsarVersion
 
-	PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
+	PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
 )
 
 type TLSOptions struct {
@@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool {
 		AuthMethodName:  proto.String(c.auth.Name()),
 		AuthData:        authData,
 		FeatureFlags: &pb.FeatureFlags{
-			SupportsAuthRefresh: proto.Bool(true),
+			SupportsAuthRefresh:         proto.Bool(true),
+			SupportsBrokerEntryMetadata: proto.Bool(true),
 		},
 	}
 
diff --git a/pulsar/message.go b/pulsar/message.go
index 3779caf..f3135b1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -121,6 +121,8 @@ type Message interface {
 	// GetEncryptionContext returns the ecryption context of the message.
 	// It will be used by the application to parse the undecrypted message.
 	GetEncryptionContext() *EncryptionContext
+
+	Index() *uint64
 }
 
 // MessageID identifier for a particular message