You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/03 15:41:28 UTC
[pulsar-client-go] branch master updated: Improve and fix message
parsing. (#80)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 94184ea Improve and fix message parsing. (#80)
94184ea is described below
commit 94184eab06c111a27f57aeefdfca7e5cffa9539f
Author: cckellogg <cc...@gmail.com>
AuthorDate: Sun Nov 3 07:41:24 2019 -0800
Improve and fix message parsing. (#80)
---
pulsar/impl_partition_consumer.go | 111 ++++++++------
pulsar/internal/commands.go | 294 +++++++++++++++-----------------------
pulsar/internal/commands_test.go | 179 +++++++++++++++++++++--
3 files changed, 351 insertions(+), 233 deletions(-)
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 2d8800d..58b3ffe 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -24,12 +24,13 @@ import (
"sync"
"time"
- "github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/apache/pulsar-client-go/pulsar/internal"
- "github.com/apache/pulsar-client-go/util"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/util"
)
const maxRedeliverUnacknowledged = 1000
@@ -600,59 +601,85 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
- msgID := response.GetMessageId()
+ pbMsgID := response.GetMessageId()
- id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
- int(msgID.GetBatchIndex()), pc.partitionIdx)
+ reader := internal.NewMessageReader(headersAndPayload)
- msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
+ msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
- return fmt.Errorf("parse message error:%s", err)
- }
-
- for _, payload := range payloadList {
- msg := &message{
- publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
- eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
- key: msgMeta.GetPartitionKey(),
- properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
- topic: pc.topic,
- msgID: id,
- payLoad: payload,
- }
+ // TODO send discardCorruptedMessage
+ return err
+ }
- consumerMsg := ConsumerMessage{
- Message: msg,
- Consumer: pc,
+ numMsgs := 1
+ if msgMeta.NumMessagesInBatch != nil {
+ numMsgs = int(msgMeta.GetNumMessagesInBatch())
+ }
+ for i := 0; i < numMsgs; i++ {
+ ssm, payload, err := reader.ReadMessage()
+ if err != nil {
+ // TODO send
+ return err
}
- select {
- case pc.subQueue <- consumerMsg:
- //Add messageId to redeliverMessages buffer, avoiding duplicates.
- newMid := response.GetMessageId()
- var dup bool
-
- pc.omu.Lock()
- for _, mid := range pc.redeliverMessages {
- if proto.Equal(mid, newMid) {
- dup = true
- break
- }
+ msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+ var msg Message
+ if ssm == nil {
+ msg = &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: msgID,
+ payLoad: payload,
}
-
- if !dup {
- pc.redeliverMessages = append(pc.redeliverMessages, newMid)
+ } else {
+ msg = &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(ssm.GetEventTime()),
+ key: ssm.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(ssm.GetProperties()),
+ topic: pc.topic,
+ msgID: msgID,
+ payLoad: payload,
}
- pc.omu.Unlock()
- continue
- default:
- return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+ }
+
+ if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
+ // TODO handle error
+ return err
}
}
return nil
}
+
+func (pc *partitionConsumer) dispatchMessage(msg Message, msgID *pb.MessageIdData) error {
+ select {
+ case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
+ //Add messageId to redeliverMessages buffer, avoiding duplicates.
+ var dup bool
+
+ pc.omu.Lock()
+ for _, mid := range pc.redeliverMessages {
+ if proto.Equal(mid, msgID) {
+ dup = true
+ break
+ }
+ }
+
+ if !dup {
+ pc.redeliverMessages = append(pc.redeliverMessages, msgID)
+ }
+ pc.omu.Unlock()
+ default:
+ return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+ }
+ return nil
+}
+
type handleAck struct {
msgID MessageID
waitGroup *sync.WaitGroup
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2800b92..7ed8e68 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -20,12 +20,14 @@ package internal
import (
"bytes"
"encoding/binary"
+ "errors"
"fmt"
- "io"
- "github.com/apache/pulsar-client-go/pkg/pb"
"github.com/golang/protobuf/proto"
+
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
)
const (
@@ -34,6 +36,114 @@ const (
magicCrc32c uint16 = 0x0e01
)
+// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
+// The data is considered corrupted if it's missing a header, a checksum mismatch or there
+// was an error when unmarshalling the message metadata.
+var ErrCorruptedMessage = errors.New("corrupted message")
+
+// ErrEOM is the error returned by ReadMessage when no more input is available.
+var ErrEOM = errors.New("EOF")
+
+func NewMessageReader(headersAndPayload []byte) *MessageReader {
+ return &MessageReader{
+ buffer: bytes.NewBuffer(headersAndPayload),
+ }
+}
+
+// MessageReader provides helper methods to parse
+// the metadata and messages from the binary format
+// Wire format for a messages
+//
+// Old format (single message)
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+//
+// Batch format
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
+//
+type MessageReader struct {
+ buffer *bytes.Buffer
+ // true if we are parsing a batched message - set after parsing the message metadata
+ batched bool
+}
+
+
+// ReadChecksum
+func (r *MessageReader) readChecksum() (uint32, error) {
+ if r.buffer.Len() < 6 {
+ return 0, errors.New("missing message header")
+ }
+ // reader magic number
+ magicNumber := binary.BigEndian.Uint16(r.buffer.Next(2))
+ if magicNumber != magicCrc32c {
+ return 0, ErrCorruptedMessage
+ }
+ checksum := binary.BigEndian.Uint32(r.buffer.Next(4))
+ return checksum, nil
+}
+
+
+func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
+ // Wire format
+ // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
+
+ // read checksum
+ checksum, err := r.readChecksum()
+ if err != nil {
+ return nil, err
+ }
+
+ // validate checksum
+ computedChecksum := Crc32cCheckSum(r.buffer.Bytes())
+ if checksum != computedChecksum {
+ return nil, fmt.Errorf("checksum mismatch received: 0x%x computed: 0x%x", checksum, computedChecksum)
+ }
+
+ size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+ data := r.buffer.Next(size)
+ var meta pb.MessageMetadata
+ if err := proto.Unmarshal(data, &meta); err != nil {
+ return nil, ErrCorruptedMessage
+ }
+
+ if meta.NumMessagesInBatch != nil {
+ r.batched = true
+ }
+
+ return &meta, nil
+}
+
+func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
+ if r.buffer.Len() == 0 {
+ return nil, nil, ErrEOM
+ }
+ if !r.batched {
+ return r.readMessage()
+ }
+
+ return r.readSingleMessage()
+}
+
+func (r *MessageReader) readMessage() (*pb.SingleMessageMetadata, []byte, error) {
+ // Wire format
+ // [PAYLOAD]
+
+ return nil, r.buffer.Next(r.buffer.Len()), nil
+}
+
+func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte, error) {
+ // Wire format
+ // [METADATA_SIZE][METADATA][PAYLOAD]
+
+ size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+ var meta pb.SingleMessageMetadata
+ if err := proto.Unmarshal(r.buffer.Next(size), &meta); err != nil {
+ return nil, nil, err
+ }
+
+ return &meta, r.buffer.Next(int(meta.GetPayloadSize())), nil
+}
+
+
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
cmd := &pb.BaseCommand{
Type: &cmdType,
@@ -87,142 +197,6 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
wb.Write(payload)
}
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) {
- // reusable buffer for 4-byte uint32s
- buf32 := make([]byte, 4)
- r := bytes.NewReader(headersAndPayload)
- // Wrap our reader so that we can only read
- // bytes from our frame
- lr := &io.LimitedReader{
- N: int64(len(headersAndPayload)),
- R: r,
- }
- // There are 3 possibilities for the following fields:
- // - EOF: If so, this is a "simple" command. No more parsing required.
- // - 2-byte magic number: Indicates the following 4 bytes are a checksum
- // - 4-byte metadata size
-
- // The message may optionally stop here. If so,
- // this is a "simple" command.
- if lr.N <= 0 {
- return nil, nil, nil
- }
-
- // Optionally, the next 2 bytes may be the magicNumber. If
- // so, it indicates that the following 4 bytes are a checksum.
- // If not, the following 2 bytes (plus the 2 bytes already read),
- // are the metadataSize, which is why a 4 byte buffer is used.
- if _, err = io.ReadFull(lr, buf32); err != nil {
- return nil, nil, err
- }
-
- // Check for magicNumber which indicates a checksum
- var chksum CheckSum
- var expectedChksum []byte
-
- magicNumber := make([]byte, 2)
- binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
- if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
- expectedChksum = make([]byte, 4)
-
- // We already read the 2-byte magicNumber and the
- // initial 2 bytes of the checksum
- expectedChksum[0] = buf32[2]
- expectedChksum[1] = buf32[3]
-
- // Read the remaining 2 bytes of the checksum
- if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
- return nil, nil, err
- }
-
- // Use a tee reader to compute the checksum
- // of everything consumed after this point
- lr.R = io.TeeReader(lr.R, &chksum)
-
- // Fill buffer with metadata size, which is what it
- // would already contain if there were no magic number / checksum
- if _, err = io.ReadFull(lr, buf32); err != nil {
- return nil, nil, err
- }
- }
-
- // Read metadataSize
- metadataSize := binary.BigEndian.Uint32(buf32)
- // guard against allocating large buffer
- if metadataSize > MaxFrameSize {
- return nil, nil, fmt.Errorf("frame metadata size (%d) "+
- "cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
- }
-
- // Read protobuf encoded metadata
- metaBuf := make([]byte, metadataSize)
- if _, err = io.ReadFull(lr, metaBuf); err != nil {
- return nil, nil, err
- }
- msgMeta = new(pb.MessageMetadata)
- if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
- return nil, nil, err
- }
-
- numMsg := msgMeta.GetNumMessagesInBatch()
-
- if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
- payloads := make([]byte, lr.N)
- if _, err = io.ReadFull(lr, payloads); err != nil {
- return nil, nil, err
- }
-
- singleMessages, e := decodeBatchPayload(payloads, numMsg)
- if e != nil {
- return nil, nil, e
- }
-
- payloadList = make([][]byte, 0, numMsg)
- for _, singleMsg := range singleMessages {
- msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
- msgMeta.Properties = singleMsg.SingleMeta.Properties
- msgMeta.EventTime = singleMsg.SingleMeta.EventTime
- payloadList = append(payloadList, singleMsg.SinglePayload)
- }
-
- if err = computeChecksum(chksum, expectedChksum); err != nil {
- return nil, nil, err
- }
- return msgMeta, payloadList, nil
- }
- // Anything left in the frame is considered
- // the payload and can be any sequence of bytes.
- payloadList = make([][]byte, 0, 10)
- if lr.N > 0 {
- // guard against allocating large buffer
- if lr.N > MaxFrameSize {
- return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
- }
-
- payload := make([]byte, lr.N)
- if _, err = io.ReadFull(lr, payload); err != nil {
- return nil, nil, err
- }
-
- payloadList = append(payloadList, payload)
- }
-
- if err = computeChecksum(chksum, expectedChksum); err != nil {
- return nil, nil, err
- }
-
- return msgMeta, payloadList, nil
-}
-
-func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
- computed := chksum.compute()
- if !bytes.Equal(computed, expectedChksum) {
- return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
- "not match given checksum (0x%X)", computed, expectedChksum)
- }
- return nil
-}
-
func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
@@ -270,50 +244,6 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
wb.PutUint32(checksum, checksumIdx)
}
-// singleMessage represents one of the elements of the batch type payload
-type singleMessage struct {
- SingleMetaSize uint32
- SingleMeta *pb.SingleMessageMetadata
- SinglePayload []byte
-}
-
-// decodeBatchPayload parses the payload of the batch type
-// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
-func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
- buf32 := make([]byte, 4)
- rdBuf := bytes.NewReader(bp)
- singleMsgList := make([]*singleMessage, 0, batchNum)
- for i := int32(0); i < batchNum; i++ {
- // singleMetaSize
- if _, err := io.ReadFull(rdBuf, buf32); err != nil {
- return nil, err
- }
- singleMetaSize := binary.BigEndian.Uint32(buf32)
-
- // singleMeta
- singleMetaBuf := make([]byte, singleMetaSize)
- if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
- return nil, err
- }
- singleMeta := new(pb.SingleMessageMetadata)
- if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
- return nil, err
- }
- // payload
- singlePayload := make([]byte, singleMeta.GetPayloadSize())
- if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
- return nil, err
- }
- singleMsg := &singleMessage{
- SingleMetaSize: singleMetaSize,
- SingleMeta: singleMeta,
- SinglePayload: singlePayload,
- }
-
- singleMsgList = append(singleMsgList, singleMsg)
- }
- return singleMsgList, nil
-}
// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 0102956..dad658e 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -38,19 +38,180 @@ func TestConvertStringMap(t *testing.T) {
assert.Equal(t, "2", m2["b"])
}
-func TestDecodeBatchPayload(t *testing.T) {
- // singleMsg = singleMetaSize(4 bytes) + singleMeta(var length) + payload
- singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114}
- list, err := decodeBatchPayload(singleMsg, 1)
+
+func TestReadMessageMetadata(t *testing.T) {
+ // read old style message (not batched)
+ reader := NewMessageReader(rawCompatSingleMessage)
+ meta, err := reader.ReadMessageMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ props := meta.GetProperties()
+ assert.Equal(t, len(props), 2)
+ assert.Equal(t, "a", props[0].GetKey())
+ assert.Equal(t, "1", props[0].GetValue())
+ assert.Equal(t, "b", props[1].GetKey())
+ assert.Equal(t, "2", props[1].GetValue())
+
+ // read message with batch of 1
+ reader = NewMessageReader(rawBatchMessage1)
+ meta, err = reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
}
- if get, want := len(list), 1; get != want {
- t.Errorf("want %v, but get %v", get, want)
+ assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+
+ // read message with batch of 10
+ reader = NewMessageReader(rawBatchMessage10)
+ meta, err = reader.ReadMessageMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+}
+
+
+func TestReadMessageOldFormat(t *testing.T) {
+ reader := NewMessageReader(rawCompatSingleMessage)
+ _, err := reader.ReadMessageMetadata()
+ if err != nil {
+ t.Fatal(err)
}
- m := list[0]
- if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
- t.Errorf("want %v, but get %v", get, want)
+ ssm, payload, err := reader.ReadMessage()
+ if err != nil {
+ t.Fatal(err)
}
+ // old message format does not have a single message metadata
+ assert.Equal(t, true, ssm == nil)
+ assert.Equal(t, "hello", string(payload))
+
+ _ , _, err = reader.ReadMessage()
+ assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize1(t *testing.T) {
+ reader := NewMessageReader(rawBatchMessage1)
+ meta, err := reader.ReadMessageMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+ for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+ ssm, payload, err := reader.ReadMessage()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, true, ssm != nil)
+ assert.Equal(t, "hello", string(payload))
+ }
+
+ _ , _, err = reader.ReadMessage()
+ assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize10(t *testing.T) {
+ reader := NewMessageReader(rawBatchMessage10)
+ meta, err := reader.ReadMessageMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+ for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+ ssm, payload, err := reader.ReadMessage()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, true, ssm != nil)
+ assert.Equal(t, "hello", string(payload))
+ }
+
+ _ , _, err = reader.ReadMessage()
+ assert.Equal(t, ErrEOM, err)
+}
+
+
+// Raw single message in old format
+// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawCompatSingleMessage = []byte{
+ 0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00,
+ 0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+ 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+ 0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef,
+ 0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01,
+ 0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01,
+ 0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05,
+ 0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 1
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawBatchMessage1 = []byte{
+ 0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00,
+ 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+ 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+ 0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80,
+ 0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01,
+ 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+ 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+ 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+ 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 10
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawBatchMessage10 = []byte{
+ 0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08,
+ 0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74,
+ 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65,
+ 0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18,
+ 0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a,
+ 0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a,
+ 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+ 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+ 0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c,
+ 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+ 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+ 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+ 0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c,
+ 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+ 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+ 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+ 0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+ 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01,
+ 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01,
+ 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05,
+ 0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00,
+ 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61,
+ 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62,
+ 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40,
+ 0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00,
+ 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12,
+ 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12,
+ 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05,
+ 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00,
+ 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01,
+ 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01,
+ 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68,
+ 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16,
+ 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31,
+ 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32,
+ 0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65,
+ 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a,
+ 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+ 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+ 0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c,
+ 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+ 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+ 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+ 0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
+ 0x6f,
}