You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/07 14:38:29 UTC

[rocketmq-client-go] branch native updated: [ISSUE #65] Fix message missing tag properties (#94)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 9df5e28  [ISSUE #65] Fix message missing tag  properties  (#94)
9df5e28 is described below

commit 9df5e28d8c1149138f784a27d639edcc9451a690
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Sun Jul 7 22:38:25 2019 +0800

    [ISSUE #65] Fix message missing tag  properties  (#94)
    
    * fix message properties. fix #65
    
    * simplify commit
    
    * simplify commit
---
 examples/producer/interceptor/main.go |  6 ++++--
 internal/kernel/request.go            |  2 ++
 internal/producer/producer.go         | 13 ++-----------
 primitive/result.go                   | 34 +++++++++++++++++++++++++++++++---
 primitive/result_test.go              | 34 ++++++++++++++++++++++++++++++++++
 5 files changed, 73 insertions(+), 16 deletions(-)

diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index f7bcf7a..c70eab3 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -22,6 +22,7 @@ import (
 	"context"
 	"fmt"
 	"os"
+	"strconv"
 
 	"github.com/apache/rocketmq-client-go/internal/producer"
 	"github.com/apache/rocketmq-client-go/primitive"
@@ -41,6 +42,7 @@ func main() {
 			//Topic: "test",
 			Topic: "TopicTest",
 			Body:  []byte("Hello RocketMQ Go Client!"),
+			Properties: map[string]string{"order": strconv.Itoa(i)},
 		})
 
 		if err != nil {
@@ -57,7 +59,7 @@ func main() {
 
 func UserFirstInterceptor() primitive.PInterceptor {
 	return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
-		fmt.Printf("user first interceptor before invoke: req:%v, reply: %v\n", req, reply)
+		fmt.Printf("user first interceptor before invoke: req:%v\n", req)
 		err := next(ctx, req, reply)
 		fmt.Printf("user first interceptor after invoke: req: %v, reply: %v \n", req, reply)
 		return err
@@ -66,7 +68,7 @@ func UserFirstInterceptor() primitive.PInterceptor {
 
 func UserSecondInterceptor() primitive.PInterceptor {
 	return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
-		fmt.Printf("user second interceptor before invoke: req: %v, reply: %v\n", req, reply)
+		fmt.Printf("user second interceptor before invoke: req: %v\n", req)
 		err := next(ctx, req, reply)
 		fmt.Printf("user second interceptor after invoke: req: %v, reply: %v \n", req, reply)
 		return err
diff --git a/internal/kernel/request.go b/internal/kernel/request.go
index 5fe1a44..ad3345b 100644
--- a/internal/kernel/request.go
+++ b/internal/kernel/request.go
@@ -71,6 +71,8 @@ func (request *SendMessageRequest) Encode() map[string]string {
 	maps["defaultTopic"] = "TBW102"
 	maps["defaultTopicQueueNums"] = "4"
 	maps["batch"] = strconv.FormatBool(request.Batch)
+	maps["properties"] = request.Properties
+
 	return maps
 }
 
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index fbcf5ee..a559a98 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -228,11 +228,12 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
 		SysFlag:        0,
 		BornTimestamp:  time.Now().UnixNano() / int64(time.Millisecond),
 		Flag:           msg.Flag,
-		Properties:     propertiesToString(msg.Properties),
+		Properties:     primitive.MarshalPropeties(msg.Properties),
 		ReconsumeTimes: 0,
 		UnitMode:       p.options.UnitMode,
 		Batch:          false,
 	}
+
 	return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
 }
 
@@ -289,13 +290,3 @@ func (p *defaultProducer) IsUnitMode() bool {
 	return false
 }
 
-func propertiesToString(properties map[string]string) string {
-	if properties == nil {
-		return ""
-	}
-	var str string
-	for k, v := range properties {
-		str += fmt.Sprintf("%s%v%s%v", k, byte(1), v, byte(2))
-	}
-	return str
-}
diff --git a/primitive/result.go b/primitive/result.go
index 628f243..217a8f3 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -36,6 +36,9 @@ const (
 
 	FlagCompressed = 0x1
 	MsgIdLength    = 8 + 8
+
+	propertySeparator  = '\002'
+	nameValueSeparator = '\001'
 )
 
 // SendResult RocketMQ send result
@@ -191,10 +194,11 @@ func DecodeMessage(data []byte) []*MessageExt {
 		msg.Topic = string(buf.Next(int(_byte)))
 		count += 1 + int(_byte)
 
+		// 17. properties
 		var propertiesLength int16
 		binary.Read(buf, binary.BigEndian, &propertiesLength)
 		if propertiesLength > 0 {
-			msg.Properties = parseProperties(buf.Next(int(propertiesLength)))
+			msg.Properties = unmarshalProperties(buf.Next(int(propertiesLength)))
 		}
 		count += 2 + int(propertiesLength)
 
@@ -211,8 +215,32 @@ func createMessageId(addr []byte, offset int64) string {
 	return "msgID" // TODO
 }
 
-func parseProperties(data []byte) map[string]string {
-	return make(map[string]string, 0)
+// unmarshalProperties parse data into property kv pairs.
+func unmarshalProperties(data []byte) map[string]string {
+	m := make(map[string]string)
+	items := bytes.Split(data, []byte{propertySeparator})
+	for _, item := range items {
+		kv := bytes.Split(item, []byte{nameValueSeparator})
+		if len(kv) == 2 {
+			m[ string(kv[0]) ] = string(kv[1])
+		}
+	}
+	return m
+}
+
+func MarshalPropeties(properties map[string]string) string {
+	if properties == nil {
+		return ""
+	}
+	buffer := bytes.NewBufferString("")
+
+	for k, v := range properties {
+		buffer.WriteString(k)
+		buffer.WriteRune(nameValueSeparator)
+		buffer.WriteString(v)
+		buffer.WriteRune(propertySeparator)
+	}
+	return buffer.String()
 }
 
 func toMessages(messageExts []*MessageExt) []*Message {
diff --git a/primitive/result_test.go b/primitive/result_test.go
new file mode 100644
index 0000000..131db8c
--- /dev/null
+++ b/primitive/result_test.go
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func Test(t *testing.T) {
+	kv := map[string]string{
+		"k1": "v1",
+		"k2": "v2",
+	}
+	str := MarshalPropeties(kv)
+	kv2 := unmarshalProperties([]byte(str))
+	assert.Equal(t, kv, kv2)
+}