You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/03/11 07:50:17 UTC
[pulsar-client-go] 01/01: [PIP 90] go client retrieve broker metadata
This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch exposing-broker-metadata
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 5acad7b37030f2c675e9eeee846fc613b12a1fcf
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Fri Mar 11 15:46:59 2022 +0800
[PIP 90] go client retrieve broker metadata
---
examples/consumer/consumer.go | 1 -
pulsar/consumer_partition.go | 15 +++++++++++++--
pulsar/impl_message.go | 5 +++++
pulsar/internal/buffer.go | 6 ++++++
pulsar/internal/commands.go | 20 ++++++++++++++++++--
pulsar/internal/commands_test.go | 20 ++++++++++++++++++++
pulsar/internal/connection.go | 5 +++--
pulsar/message.go | 2 ++
8 files changed, 67 insertions(+), 7 deletions(-)
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 0b819bc..857a891 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -48,7 +48,6 @@ func main() {
if err != nil {
log.Fatal(err)
}
-
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a39c5..ea1ae4e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pbMsgID := response.GetMessageId()
reader := internal.NewMessageReader(headersAndPayload)
+ brokerMetadata, err := reader.ReadBrokerMetadata()
+ if err != nil {
+ // todo optimize use more appropriate error codes
+ pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
+ return err
+ }
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
return err
}
-
decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
@@ -597,7 +602,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.AckID(msgID)
continue
}
-
+ var messageIndex *uint64
+ if brokerMetadata != nil {
+ aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1
+ messageIndex = &aux
+ }
// set the consumer so we know how to ack the message id
msgID.consumer = pc
var msg *message
@@ -616,6 +625,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
orderingKey: string(smm.OrderingKey),
+ index: messageIndex,
}
} else {
msg = &message{
@@ -631,6 +641,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
+ index: messageIndex,
}
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9809aa..d4e3025 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -241,6 +241,7 @@ type message struct {
redeliveryCount uint32
schema Schema
encryptionContext *EncryptionContext
+ index *uint64
}
func (msg *message) Topic() string {
@@ -299,6 +300,10 @@ func (msg *message) GetEncryptionContext() *EncryptionContext {
return msg.encryptionContext
}
+func (msg *message) Index() *uint64 {
+ return msg.index
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index f3b8fe6..b3e23fb 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -38,6 +38,8 @@ type Buffer interface {
Read(size uint32) []byte
+ Skip(size uint32)
+
Get(readerIndex uint32, size uint32) []byte
ReadableSlice() []byte
@@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte {
return res
}
+func (b *buffer) Skip(size uint32) {
+ b.readerIdx += size
+}
+
func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
return b.data[readerIdx : readerIdx+size]
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b91c0b6..7fd1885 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,6 +18,7 @@
package internal
import (
+ "encoding/binary"
"errors"
"fmt"
@@ -34,8 +35,9 @@ const (
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
- MaxFrameSize = MaxMessageSize + MessageFramePadding
- magicCrc32c uint16 = 0x0e01
+ MaxFrameSize = MaxMessageSize + MessageFramePadding
+ magicCrc32c uint16 = 0x0e01
+ magicBrokerEntryMetadata uint16 = 0x0e02
)
// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
@@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
return &meta, nil
}
+func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
+ magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2))
+ if magicNumber != magicBrokerEntryMetadata {
+ return nil, nil
+ }
+ r.buffer.Skip(2)
+ size := r.buffer.ReadUint32()
+ var brokerEntryMetadata pb.BrokerEntryMetadata
+ if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); err != nil {
+ return nil, err
+ }
+ return &brokerEntryMetadata, nil
+}
+
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index b43335a..c290c2c 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -18,6 +18,7 @@
package internal
import (
+ "fmt"
"testing"
"github.com/stretchr/testify/assert"
@@ -70,6 +71,20 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
}
+func TestReadBrokerEntryMetadata(t *testing.T) {
+ // read old style message (not batched)
+ reader := NewMessageReaderFromArray(brokerEntryMeta)
+ meta, err := reader.ReadBrokerMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var expectedBrokerTimestamp uint64 = 1646983036054
+ assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp)
+ var expectedIndex uint64 = 5
+ assert.Equal(t, expectedIndex, *meta.Index)
+ fmt.Println(meta)
+}
+
func TestReadMessageOldFormat(t *testing.T) {
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
@@ -210,3 +225,8 @@ var rawBatchMessage10 = []byte{
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}
+
+var brokerEntryMeta = []byte{
+ 0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96,
+ 0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05,
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e2009c..a025abf 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -43,7 +43,7 @@ const (
PulsarVersion = "0.1"
ClientVersionString = "Pulsar Go " + PulsarVersion
- PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
+ PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
)
type TLSOptions struct {
@@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool {
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
FeatureFlags: &pb.FeatureFlags{
- SupportsAuthRefresh: proto.Bool(true),
+ SupportsAuthRefresh: proto.Bool(true),
+ SupportsBrokerEntryMetadata: proto.Bool(true),
},
}
diff --git a/pulsar/message.go b/pulsar/message.go
index 3779caf..f3135b1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -121,6 +121,8 @@ type Message interface {
// GetEncryptionContext returns the ecryption context of the message.
// It will be used by the application to parse the undecrypted message.
GetEncryptionContext() *EncryptionContext
+
+ Index() *uint64
}
// MessageID identifier for a particular message