You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/06 15:39:44 UTC
[pulsar-client-go] branch master updated: [issue:40]Fix producer
send protocol error (#42)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d049757 [issue:40]Fix producer send protocol error (#42)
d049757 is described below
commit d0497579e09814eb0aa1ee026a338d4fe0ba0771
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Aug 6 23:39:39 2019 +0800
[issue:40]Fix producer send protocol error (#42)
* [issue:40]Fix producer send protocol error
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments and check code format
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
pulsar/consumer_test.go | 13 ++++-
pulsar/internal/commands.go | 104 ++++++++++++++++++++++++++++-----------
pulsar/internal/commands_test.go | 17 +++++++
3 files changed, 103 insertions(+), 31 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8eeb784..39646d3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -50,7 +50,7 @@ func TestProducerConsumer(t *testing.T) {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
- Type: Shared,
+ Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
@@ -67,6 +67,10 @@ func TestProducerConsumer(t *testing.T) {
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
}); err != nil {
log.Fatal(err)
}
@@ -80,7 +84,12 @@ func TestProducerConsumer(t *testing.T) {
}
expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
// ack message
if err := consumer.Ack(msg); err != nil {
@@ -213,7 +222,7 @@ func makeHTTPCall(t *testing.T, method string, urls string, body string) {
defer res.Body.Close()
}
-func TestConsumerShared(t *testing.T) {
+func TestConsumerKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 5068ebb..04545c1 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,20 +18,20 @@
package internal
import (
- `bytes`
- `encoding/binary`
- `fmt`
- "github.com/golang/protobuf/proto"
- `io`
-
- "github.com/apache/pulsar-client-go/pkg/pb"
- log "github.com/sirupsen/logrus"
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "io"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ log "github.com/sirupsen/logrus"
)
const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
- MaxFrameSize = 5 * 1024 * 1024
- magicCrc32c uint16 = 0x0e01
+ MaxFrameSize = 5 * 1024 * 1024
+ magicCrc32c uint16 = 0x0e01
)
func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
@@ -82,6 +82,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
log.WithError(err).Fatal("Protobuf serialization error")
}
+ wb.WriteUint32(uint32(len(serialized)))
wb.Write(serialized)
wb.Write(payload)
}
@@ -150,7 +151,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
// guard against allocating large buffer
if metadataSize > MaxFrameSize {
return nil, nil, fmt.Errorf("frame metadata size (%d) "+
- "cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
+ "cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
}
// Read protobuf encoded metadata
@@ -163,28 +164,30 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
return nil, nil, err
}
- batchLen := make([]byte, 2)
- if _, err = io.ReadFull(lr, batchLen); err != nil {
- return nil, nil, err
- }
-
- // Anything left in the frame is considered
- // the payload and can be any sequence of bytes.
- if lr.N > 0 {
- // guard against allocating large buffer
- if lr.N > MaxFrameSize {
- return nil, nil, fmt.Errorf("frame payload size (%d) "+
- "cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
- }
- payload = make([]byte, lr.N)
- if _, err = io.ReadFull(lr, payload); err != nil {
- return nil, nil, err
- }
+ // Anything left in the frame is considered
+ // the payload and can be any sequence of bytes.
+ payloads := make([]byte, lr.N)
+ if _, err = io.ReadFull(lr, payloads); err != nil {
+ return nil, nil, err
+ }
+
+ numMsg := msgMeta.GetNumMessagesInBatch()
+
+ singleMessages, err := decodeBatchPayload(payloads, numMsg)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ for _, singleMsg := range singleMessages {
+ payload = singleMsg.SinglePayload
+ msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
+ msgMeta.Properties = singleMsg.SingleMeta.Properties
+ msgMeta.EventTime = singleMsg.SingleMeta.EventTime
}
if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
- "not match given checksum (0x%X)", computed, expectedChksum)
+ "not match given checksum (0x%X)", computed, expectedChksum)
}
return msgMeta, payload, nil
@@ -237,6 +240,49 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
wb.PutUint32(checksum, checksumIdx)
}
+// singleMessage represents one of the elements of the batch type payload
+type singleMessage struct {
+ SingleMetaSize uint32
+ SingleMeta *pb.SingleMessageMetadata
+ SinglePayload []byte
+}
+
+// decodeBatchPayload parses the payload of the batch type
+// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
+func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
+ buf32 := make([]byte, 4)
+ rdBuf := bytes.NewReader(bp)
+ list := make([]*singleMessage, 0, batchNum)
+ for i := int32(0); i < batchNum; i++ {
+ // singleMetaSize
+ if _, err := io.ReadFull(rdBuf, buf32); err != nil {
+ return nil, err
+ }
+ singleMetaSize := binary.BigEndian.Uint32(buf32)
+
+ // singleMeta
+ singleMetaBuf := make([]byte, singleMetaSize)
+ if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
+ return nil, err
+ }
+ singleMeta := new(pb.SingleMessageMetadata)
+ if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
+ return nil, err
+ }
+ // payload
+ singlePayload := make([]byte, singleMeta.GetPayloadSize())
+ if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
+ return nil, err
+ }
+ d := &singleMessage{}
+ d.SingleMetaSize = singleMetaSize
+ d.SingleMeta = singleMeta
+ d.SinglePayload = singlePayload
+ list = append(list, d)
+ }
+ return list, nil
+}
+
// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 3ae35c9..0102956 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -37,3 +37,20 @@ func TestConvertStringMap(t *testing.T) {
assert.Equal(t, "1", m2["a"])
assert.Equal(t, "2", m2["b"])
}
+
+func TestDecodeBatchPayload(t *testing.T) {
+ // singleMsg = singleMetaSize(4 bytes) + singleMeta(var length) + payload
+ singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114}
+ list, err := decodeBatchPayload(singleMsg, 1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if get, want := len(list), 1; get != want {
+ t.Errorf("want %v, but get %v", get, want)
+ }
+
+ m := list[0]
+ if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
+ t.Errorf("want %v, but get %v", get, want)
+ }
+}