You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/03 15:41:28 UTC

[pulsar-client-go] branch master updated: Improve and fix message parsing. (#80)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 94184ea  Improve and fix message parsing. (#80)
94184ea is described below

commit 94184eab06c111a27f57aeefdfca7e5cffa9539f
Author: cckellogg <cc...@gmail.com>
AuthorDate: Sun Nov 3 07:41:24 2019 -0800

    Improve and fix message parsing. (#80)
---
 pulsar/impl_partition_consumer.go | 111 ++++++++------
 pulsar/internal/commands.go       | 294 +++++++++++++++-----------------------
 pulsar/internal/commands_test.go  | 179 +++++++++++++++++++++--
 3 files changed, 351 insertions(+), 233 deletions(-)

diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 2d8800d..58b3ffe 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -24,12 +24,13 @@ import (
 	"sync"
 	"time"
 
-	"github.com/apache/pulsar-client-go/pkg/pb"
-	"github.com/apache/pulsar-client-go/pulsar/internal"
-	"github.com/apache/pulsar-client-go/util"
 	"github.com/golang/protobuf/proto"
 
 	log "github.com/sirupsen/logrus"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+	"github.com/apache/pulsar-client-go/util"
 )
 
 const maxRedeliverUnacknowledged = 1000
@@ -600,59 +601,85 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
 }
 
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error {
-	msgID := response.GetMessageId()
+	pbMsgID := response.GetMessageId()
 
-	id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()),
-		int(msgID.GetBatchIndex()), pc.partitionIdx)
+	reader := internal.NewMessageReader(headersAndPayload)
 
-	msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
+	msgMeta, err := reader.ReadMessageMetadata()
 	if err != nil {
-		return fmt.Errorf("parse message error:%s", err)
-	}
-
-	for _, payload := range payloadList {
-		msg := &message{
-			publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-			eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-			key:         msgMeta.GetPartitionKey(),
-			properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
-			topic:       pc.topic,
-			msgID:       id,
-			payLoad:     payload,
-		}
+		// TODO send discardCorruptedMessage
+		return err
+	}
 
-		consumerMsg := ConsumerMessage{
-			Message:  msg,
-			Consumer: pc,
+	numMsgs := 1
+	if msgMeta.NumMessagesInBatch != nil {
+		numMsgs = int(msgMeta.GetNumMessagesInBatch())
+	}
+	for i := 0; i < numMsgs; i++ {
+		ssm, payload, err := reader.ReadMessage()
+		if err != nil {
+			// TODO send
+			return err
 		}
 
-		select {
-		case pc.subQueue <- consumerMsg:
-			//Add messageId to redeliverMessages buffer, avoiding duplicates.
-			newMid := response.GetMessageId()
-			var dup bool
-
-			pc.omu.Lock()
-			for _, mid := range pc.redeliverMessages {
-				if proto.Equal(mid, newMid) {
-					dup = true
-					break
-				}
+		msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+		var msg Message
+		if ssm == nil {
+			msg = &message{
+				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:   timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+				key:         msgMeta.GetPartitionKey(),
+				properties:  internal.ConvertToStringMap(msgMeta.GetProperties()),
+				topic:       pc.topic,
+				msgID:       msgID,
+				payLoad:     payload,
 			}
-
-			if !dup {
-				pc.redeliverMessages = append(pc.redeliverMessages, newMid)
+		} else {
+			msg = &message{
+				publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+				eventTime:   timeFromUnixTimestampMillis(ssm.GetEventTime()),
+				key:         ssm.GetPartitionKey(),
+				properties:  internal.ConvertToStringMap(ssm.GetProperties()),
+				topic:       pc.topic,
+				msgID:       msgID,
+				payLoad:     payload,
 			}
-			pc.omu.Unlock()
-			continue
-		default:
-			return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+		}
+
+		if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
+			// TODO handle error
+			return err
 		}
 	}
 
 	return nil
 }
 
+
+func (pc *partitionConsumer) dispatchMessage(msg Message, msgID *pb.MessageIdData) error {
+	select {
+	case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
+		//Add messageId to redeliverMessages buffer, avoiding duplicates.
+		var dup bool
+
+		pc.omu.Lock()
+		for _, mid := range pc.redeliverMessages {
+			if proto.Equal(mid, msgID) {
+				dup = true
+				break
+			}
+		}
+
+		if !dup {
+			pc.redeliverMessages = append(pc.redeliverMessages, msgID)
+		}
+		pc.omu.Unlock()
+	default:
+		return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+	}
+	return nil
+}
+
 type handleAck struct {
 	msgID     MessageID
 	waitGroup *sync.WaitGroup
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2800b92..7ed8e68 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -20,12 +20,14 @@ package internal
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
 	"fmt"
-	"io"
 
-	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/golang/protobuf/proto"
+
 	log "github.com/sirupsen/logrus"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
 )
 
 const (
@@ -34,6 +36,114 @@ const (
 	magicCrc32c  uint16 = 0x0e01
 )
 
+// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
+// The data is considered corrupted if it's missing a header, a checksum mismatch or there
+// was an error when unmarshalling the message metadata.
+var ErrCorruptedMessage = errors.New("corrupted message")
+
+// ErrEOM is the error returned by ReadMessage when no more input is available.
+var ErrEOM = errors.New("EOF")
+
+func NewMessageReader(headersAndPayload []byte) *MessageReader {
+	return &MessageReader{
+		buffer: bytes.NewBuffer(headersAndPayload),
+	}
+}
+
+// MessageReader provides helper methods to parse
+// the metadata and messages from the binary format
+// Wire format for a messages
+//
+// Old format (single message)
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+//
+// Batch format
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
+//
+type MessageReader struct {
+	buffer *bytes.Buffer
+	// true if we are parsing a batched message - set after parsing the message metadata
+	batched bool
+}
+
+
+// ReadChecksum
+func (r *MessageReader) readChecksum() (uint32, error) {
+	if r.buffer.Len() < 6 {
+		return 0, errors.New("missing message header")
+	}
+	// reader magic number
+	magicNumber := binary.BigEndian.Uint16(r.buffer.Next(2))
+	if magicNumber != magicCrc32c {
+		return 0, ErrCorruptedMessage
+	}
+	checksum := binary.BigEndian.Uint32(r.buffer.Next(4))
+	return checksum, nil
+}
+
+
+func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
+	// Wire format
+	// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
+
+	// read checksum
+	checksum, err := r.readChecksum()
+	if err != nil {
+		return nil, err
+	}
+
+	// validate checksum
+	computedChecksum := Crc32cCheckSum(r.buffer.Bytes())
+	if checksum != computedChecksum {
+		return nil, fmt.Errorf("checksum mismatch received: 0x%x computed: 0x%x", checksum, computedChecksum)
+	}
+
+	size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+	data := r.buffer.Next(size)
+	var meta pb.MessageMetadata
+	if err := proto.Unmarshal(data, &meta); err != nil {
+		return nil, ErrCorruptedMessage
+	}
+
+	if meta.NumMessagesInBatch != nil {
+		r.batched = true
+	}
+
+	return &meta, nil
+}
+
+func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
+	if r.buffer.Len() == 0 {
+		return nil, nil, ErrEOM
+	}
+	if !r.batched {
+		return r.readMessage()
+	}
+
+	return r.readSingleMessage()
+}
+
+func (r *MessageReader) readMessage() (*pb.SingleMessageMetadata, []byte, error) {
+	// Wire format
+	// [PAYLOAD]
+
+	return nil, r.buffer.Next(r.buffer.Len()), nil
+}
+
+func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, []byte, error) {
+	// Wire format
+	// [METADATA_SIZE][METADATA][PAYLOAD]
+
+	size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+	var meta pb.SingleMessageMetadata
+	if err := proto.Unmarshal(r.buffer.Next(size), &meta); err != nil {
+		return nil, nil, err
+	}
+
+	return &meta, r.buffer.Next(int(meta.GetPayloadSize())), nil
+}
+
+
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
 	cmd := &pb.BaseCommand{
 		Type: &cmdType,
@@ -87,142 +197,6 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
 	wb.Write(payload)
 }
 
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) {
-	// reusable buffer for 4-byte uint32s
-	buf32 := make([]byte, 4)
-	r := bytes.NewReader(headersAndPayload)
-	// Wrap our reader so that we can only read
-	// bytes from our frame
-	lr := &io.LimitedReader{
-		N: int64(len(headersAndPayload)),
-		R: r,
-	}
-	// There are 3 possibilities for the following fields:
-	//  - EOF: If so, this is a "simple" command. No more parsing required.
-	//  - 2-byte magic number: Indicates the following 4 bytes are a checksum
-	//  - 4-byte metadata size
-
-	// The message may optionally stop here. If so,
-	// this is a "simple" command.
-	if lr.N <= 0 {
-		return nil, nil, nil
-	}
-
-	// Optionally, the next 2 bytes may be the magicNumber. If
-	// so, it indicates that the following 4 bytes are a checksum.
-	// If not, the following 2 bytes (plus the 2 bytes already read),
-	// are the metadataSize, which is why a 4 byte buffer is used.
-	if _, err = io.ReadFull(lr, buf32); err != nil {
-		return nil, nil, err
-	}
-
-	// Check for magicNumber which indicates a checksum
-	var chksum CheckSum
-	var expectedChksum []byte
-
-	magicNumber := make([]byte, 2)
-	binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
-	if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
-		expectedChksum = make([]byte, 4)
-
-		// We already read the 2-byte magicNumber and the
-		// initial 2 bytes of the checksum
-		expectedChksum[0] = buf32[2]
-		expectedChksum[1] = buf32[3]
-
-		// Read the remaining 2 bytes of the checksum
-		if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
-			return nil, nil, err
-		}
-
-		// Use a tee reader to compute the checksum
-		// of everything consumed after this point
-		lr.R = io.TeeReader(lr.R, &chksum)
-
-		// Fill buffer with metadata size, which is what it
-		// would already contain if there were no magic number / checksum
-		if _, err = io.ReadFull(lr, buf32); err != nil {
-			return nil, nil, err
-		}
-	}
-
-	// Read metadataSize
-	metadataSize := binary.BigEndian.Uint32(buf32)
-	// guard against allocating large buffer
-	if metadataSize > MaxFrameSize {
-		return nil, nil, fmt.Errorf("frame metadata size (%d) "+
-			"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
-	}
-
-	// Read protobuf encoded metadata
-	metaBuf := make([]byte, metadataSize)
-	if _, err = io.ReadFull(lr, metaBuf); err != nil {
-		return nil, nil, err
-	}
-	msgMeta = new(pb.MessageMetadata)
-	if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
-		return nil, nil, err
-	}
-
-	numMsg := msgMeta.GetNumMessagesInBatch()
-
-	if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
-		payloads := make([]byte, lr.N)
-		if _, err = io.ReadFull(lr, payloads); err != nil {
-			return nil, nil, err
-		}
-
-		singleMessages, e := decodeBatchPayload(payloads, numMsg)
-		if e != nil {
-			return nil, nil, e
-		}
-
-		payloadList = make([][]byte, 0, numMsg)
-		for _, singleMsg := range singleMessages {
-			msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
-			msgMeta.Properties = singleMsg.SingleMeta.Properties
-			msgMeta.EventTime = singleMsg.SingleMeta.EventTime
-			payloadList = append(payloadList, singleMsg.SinglePayload)
-		}
-
-		if err = computeChecksum(chksum, expectedChksum); err != nil {
-			return nil, nil, err
-		}
-		return msgMeta, payloadList, nil
-	}
-	// Anything left in the frame is considered
-	// the payload and can be any sequence of bytes.
-	payloadList = make([][]byte, 0, 10)
-	if lr.N > 0 {
-		// guard against allocating large buffer
-		if lr.N > MaxFrameSize {
-			return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
-		}
-
-		payload := make([]byte, lr.N)
-		if _, err = io.ReadFull(lr, payload); err != nil {
-			return nil, nil, err
-		}
-
-		payloadList = append(payloadList, payload)
-	}
-
-	if err = computeChecksum(chksum, expectedChksum); err != nil {
-		return nil, nil, err
-	}
-
-	return msgMeta, payloadList, nil
-}
-
-func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
-	computed := chksum.compute()
-	if !bytes.Equal(computed, expectedChksum) {
-		return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
-			"not match given checksum (0x%X)", computed, expectedChksum)
-	}
-	return nil
-}
-
 func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) {
 	// Wire format
 	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
@@ -270,50 +244,6 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
 	wb.PutUint32(checksum, checksumIdx)
 }
 
-// singleMessage represents one of the elements of the batch type payload
-type singleMessage struct {
-	SingleMetaSize uint32
-	SingleMeta     *pb.SingleMessageMetadata
-	SinglePayload  []byte
-}
-
-// decodeBatchPayload parses the payload of the batch type
-// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
-func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
-	buf32 := make([]byte, 4)
-	rdBuf := bytes.NewReader(bp)
-	singleMsgList := make([]*singleMessage, 0, batchNum)
-	for i := int32(0); i < batchNum; i++ {
-		// singleMetaSize
-		if _, err := io.ReadFull(rdBuf, buf32); err != nil {
-			return nil, err
-		}
-		singleMetaSize := binary.BigEndian.Uint32(buf32)
-
-		// singleMeta
-		singleMetaBuf := make([]byte, singleMetaSize)
-		if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
-			return nil, err
-		}
-		singleMeta := new(pb.SingleMessageMetadata)
-		if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
-			return nil, err
-		}
-		// payload
-		singlePayload := make([]byte, singleMeta.GetPayloadSize())
-		if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
-			return nil, err
-		}
-		singleMsg := &singleMessage{
-			SingleMetaSize: singleMetaSize,
-			SingleMeta:     singleMeta,
-			SinglePayload:  singlePayload,
-		}
-
-		singleMsgList = append(singleMsgList, singleMsg)
-	}
-	return singleMsgList, nil
-}
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 0102956..dad658e 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -38,19 +38,180 @@ func TestConvertStringMap(t *testing.T) {
 	assert.Equal(t, "2", m2["b"])
 }
 
-func TestDecodeBatchPayload(t *testing.T) {
-	// singleMsg = singleMetaSize(4  bytes) + singleMeta(var length) + payload
-	singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114}
-	list, err := decodeBatchPayload(singleMsg, 1)
+
+func TestReadMessageMetadata(t *testing.T) {
+	// read old style message (not batched)
+	reader := NewMessageReader(rawCompatSingleMessage)
+	meta, err := reader.ReadMessageMetadata()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	props := meta.GetProperties()
+	assert.Equal(t, len(props), 2)
+	assert.Equal(t, "a", props[0].GetKey())
+	assert.Equal(t, "1", props[0].GetValue())
+	assert.Equal(t, "b", props[1].GetKey())
+	assert.Equal(t, "2", props[1].GetValue())
+
+	// read message with batch of 1
+	reader = NewMessageReader(rawBatchMessage1)
+	meta, err = reader.ReadMessageMetadata()
 	if err != nil {
 		t.Fatal(err)
 	}
-	if get, want := len(list), 1; get != want {
-		t.Errorf("want %v, but get %v", get, want)
+	assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+
+	// read message with batch of 10
+	reader = NewMessageReader(rawBatchMessage10)
+	meta, err = reader.ReadMessageMetadata()
+	if err != nil {
+		t.Fatal(err)
+	}
+	assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+}
+
+
+func TestReadMessageOldFormat(t *testing.T) {
+	reader := NewMessageReader(rawCompatSingleMessage)
+	_, err := reader.ReadMessageMetadata()
+	if err != nil {
+		t.Fatal(err)
 	}
 
-	m := list[0]
-	if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
-		t.Errorf("want %v, but get %v", get, want)
+	ssm, payload, err := reader.ReadMessage()
+	if err != nil {
+		t.Fatal(err)
 	}
+	// old message format does not have a single message metadata
+	assert.Equal(t, true, ssm == nil)
+	assert.Equal(t, "hello", string(payload))
+
+	_ , _, err = reader.ReadMessage()
+	assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize1(t *testing.T) {
+	reader := NewMessageReader(rawBatchMessage1)
+	meta, err := reader.ReadMessageMetadata()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+	for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+		ssm, payload, err := reader.ReadMessage()
+		if err != nil {
+			t.Fatal(err)
+		}
+		assert.Equal(t, true, ssm != nil)
+		assert.Equal(t, "hello", string(payload))
+	}
+
+	_ , _, err = reader.ReadMessage()
+	assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize10(t *testing.T) {
+	reader := NewMessageReader(rawBatchMessage10)
+	meta, err := reader.ReadMessageMetadata()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+	for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+		ssm, payload, err := reader.ReadMessage()
+		if err != nil {
+			t.Fatal(err)
+		}
+		assert.Equal(t, true, ssm != nil)
+		assert.Equal(t, "hello", string(payload))
+	}
+
+	_ , _, err = reader.ReadMessage()
+	assert.Equal(t, ErrEOM, err)
+}
+
+
+// Raw single message in old format
+// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawCompatSingleMessage = []byte{
+	0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00,
+	0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+	0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+	0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef,
+	0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01,
+	0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01,
+	0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05,
+	0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 1
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawBatchMessage1 = []byte{
+	0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00,
+	0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+	0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+	0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80,
+	0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01,
+	0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+	0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+	0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+	0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 10
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawBatchMessage10 = []byte{
+	0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08,
+	0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74,
+	0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65,
+	0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18,
+	0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a,
+	0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a,
+	0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+	0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+	0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c,
+	0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+	0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+	0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+	0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c,
+	0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+	0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+	0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+	0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+	0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01,
+	0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01,
+	0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05,
+	0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00,
+	0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61,
+	0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62,
+	0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40,
+	0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00,
+	0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12,
+	0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12,
+	0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05,
+	0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00,
+	0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01,
+	0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01,
+	0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68,
+	0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16,
+	0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31,
+	0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32,
+	0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65,
+	0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a,
+	0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+	0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+	0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c,
+	0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+	0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+	0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+	0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
+	0x6f,
 }