You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/08/06 15:39:44 UTC

[pulsar-client-go] branch master updated: [issue:40]Fix producer send protocol error (#42)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d049757  [issue:40]Fix producer send protocol error (#42)
d049757 is described below

commit d0497579e09814eb0aa1ee026a338d4fe0ba0771
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Aug 6 23:39:39 2019 +0800

    [issue:40]Fix producer send protocol error (#42)
    
    * [issue:40]Fix producer send protocol error
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments and check code format
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 pulsar/consumer_test.go          |  13 ++++-
 pulsar/internal/commands.go      | 104 ++++++++++++++++++++++++++++-----------
 pulsar/internal/commands_test.go |  17 +++++++
 3 files changed, 103 insertions(+), 31 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8eeb784..39646d3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -50,7 +50,7 @@ func TestProducerConsumer(t *testing.T) {
 	consumer, err := client.Subscribe(ConsumerOptions{
 		Topic:            topic,
 		SubscriptionName: "my-sub",
-		Type:             Shared,
+		Type:             Exclusive,
 	})
 	assert.Nil(t, err)
 	defer consumer.Close()
@@ -67,6 +67,10 @@ func TestProducerConsumer(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		if err := producer.Send(ctx, &ProducerMessage{
 			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "pulsar",
+			Properties: map[string]string{
+				"key-1": "pulsar-1",
+			},
 		}); err != nil {
 			log.Fatal(err)
 		}
@@ -80,7 +84,12 @@ func TestProducerConsumer(t *testing.T) {
 		}
 
 		expectMsg := fmt.Sprintf("hello-%d", i)
+		expectProperties := map[string]string{
+			"key-1": "pulsar-1",
+		}
 		assert.Equal(t, []byte(expectMsg), msg.Payload())
+		assert.Equal(t, "pulsar", msg.Key())
+		assert.Equal(t, expectProperties, msg.Properties())
 
 		// ack message
 		if err := consumer.Ack(msg); err != nil {
@@ -213,7 +222,7 @@ func makeHTTPCall(t *testing.T, method string, urls string, body string) {
 	defer res.Body.Close()
 }
 
-func TestConsumerShared(t *testing.T) {
+func TestConsumerKeyShared(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
 	})
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 5068ebb..04545c1 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,20 +18,20 @@
 package internal
 
 import (
-    `bytes`
-    `encoding/binary`
-    `fmt`
-    "github.com/golang/protobuf/proto"
-    `io`
-
-    "github.com/apache/pulsar-client-go/pkg/pb"
-    log "github.com/sirupsen/logrus"
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"github.com/golang/protobuf/proto"
+	"io"
+
+	"github.com/apache/pulsar-client-go/pkg/pb"
+	log "github.com/sirupsen/logrus"
 )
 
 const (
 	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-	MaxFrameSize = 5 * 1024 * 1024
-	magicCrc32c uint16 = 0x0e01
+	MaxFrameSize        = 5 * 1024 * 1024
+	magicCrc32c  uint16 = 0x0e01
 )
 
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand {
@@ -82,6 +82,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
 		log.WithError(err).Fatal("Protobuf serialization error")
 	}
 
+	wb.WriteUint32(uint32(len(serialized)))
 	wb.Write(serialized)
 	wb.Write(payload)
 }
@@ -150,7 +151,7 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
 	// guard against allocating large buffer
 	if metadataSize > MaxFrameSize {
 		return nil, nil, fmt.Errorf("frame metadata size (%d) "+
-				"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
+			"cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize)
 	}
 
 	// Read protobuf encoded metadata
@@ -163,28 +164,30 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa
 		return nil, nil, err
 	}
 
-    batchLen := make([]byte, 2)
-    if _, err = io.ReadFull(lr, batchLen); err != nil {
-        return nil, nil, err
-    }
-
-    // Anything left in the frame is considered
-    // the payload and can be any sequence of bytes.
-	if lr.N > 0 {
-		// guard against allocating large buffer
-		if lr.N > MaxFrameSize {
-			return nil, nil, fmt.Errorf("frame payload size (%d) "+
-					"cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
-		}
-		payload = make([]byte, lr.N)
-		if _, err = io.ReadFull(lr, payload); err != nil {
-			return nil, nil, err
-		}
+	// Anything left in the frame is considered
+	// the payload and can be any sequence of bytes.
+	payloads := make([]byte, lr.N)
+	if _, err = io.ReadFull(lr, payloads); err != nil {
+		return nil, nil, err
+	}
+
+	numMsg := msgMeta.GetNumMessagesInBatch()
+
+	singleMessages, err := decodeBatchPayload(payloads, numMsg)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	for _, singleMsg := range singleMessages {
+		payload = singleMsg.SinglePayload
+		msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
+		msgMeta.Properties = singleMsg.SingleMeta.Properties
+		msgMeta.EventTime = singleMsg.SingleMeta.EventTime
 	}
 
 	if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) {
 		return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+
-				"not match given checksum (0x%X)", computed, expectedChksum)
+			"not match given checksum (0x%X)", computed, expectedChksum)
 	}
 
 	return msgMeta, payload, nil
@@ -237,6 +240,49 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// singleMessage represents one of the elements of the batch type payload
+type singleMessage struct {
+	SingleMetaSize uint32
+	SingleMeta     *pb.SingleMessageMetadata
+	SinglePayload  []byte
+}
+
+// decodeBatchPayload parses the payload of the batch type
+// If the producer uses the batch function, msg.Payload will be a singleMessage array structure.
+func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
+	buf32 := make([]byte, 4)
+	rdBuf := bytes.NewReader(bp)
+	list := make([]*singleMessage, 0, batchNum)
+	for i := int32(0); i < batchNum; i++ {
+		// singleMetaSize
+		if _, err := io.ReadFull(rdBuf, buf32); err != nil {
+			return nil, err
+		}
+		singleMetaSize := binary.BigEndian.Uint32(buf32)
+
+		// singleMeta
+		singleMetaBuf := make([]byte, singleMetaSize)
+		if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
+			return nil, err
+		}
+		singleMeta := new(pb.SingleMessageMetadata)
+		if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
+			return nil, err
+		}
+		// payload
+		singlePayload := make([]byte, singleMeta.GetPayloadSize())
+		if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
+			return nil, err
+		}
+		d := &singleMessage{}
+		d.SingleMetaSize = singleMetaSize
+		d.SingleMeta = singleMeta
+		d.SinglePayload = singlePayload
+		list = append(list, d)
+	}
+	return list, nil
+}
+
 // ConvertFromStringMap convert a string map to a KeyValue []byte
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
 	list := make([]*pb.KeyValue, len(m))
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 3ae35c9..0102956 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -37,3 +37,20 @@ func TestConvertStringMap(t *testing.T) {
 	assert.Equal(t, "1", m2["a"])
 	assert.Equal(t, "2", m2["b"])
 }
+
+func TestDecodeBatchPayload(t *testing.T) {
+	// singleMsg = singleMetaSize(4  bytes) + singleMeta(var length) + payload
+	singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 112, 117, 108, 115, 97, 114}
+	list, err := decodeBatchPayload(singleMsg, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if get, want := len(list), 1; get != want {
+		t.Errorf("want %v, but get %v", get, want)
+	}
+
+	m := list[0]
+	if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
+		t.Errorf("want %v, but get %v", get, want)
+	}
+}