You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/07 14:38:29 UTC
[rocketmq-client-go] branch native updated: [ISSUE #65] Fix message
missing tag properties (#94)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 9df5e28 [ISSUE #65] Fix message missing tag properties (#94)
9df5e28 is described below
commit 9df5e28d8c1149138f784a27d639edcc9451a690
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Sun Jul 7 22:38:25 2019 +0800
[ISSUE #65] Fix message missing tag properties (#94)
* fix message properties. fix #65
* simplify commit
* simplify commit
---
examples/producer/interceptor/main.go | 6 ++++--
internal/kernel/request.go | 2 ++
internal/producer/producer.go | 13 ++-----------
primitive/result.go | 34 +++++++++++++++++++++++++++++++---
primitive/result_test.go | 34 ++++++++++++++++++++++++++++++++++
5 files changed, 73 insertions(+), 16 deletions(-)
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index f7bcf7a..c70eab3 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -22,6 +22,7 @@ import (
"context"
"fmt"
"os"
+ "strconv"
"github.com/apache/rocketmq-client-go/internal/producer"
"github.com/apache/rocketmq-client-go/primitive"
@@ -41,6 +42,7 @@ func main() {
//Topic: "test",
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
+ Properties: map[string]string{"order": strconv.Itoa(i)},
})
if err != nil {
@@ -57,7 +59,7 @@ func main() {
func UserFirstInterceptor() primitive.PInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
- fmt.Printf("user first interceptor before invoke: req:%v, reply: %v\n", req, reply)
+ fmt.Printf("user first interceptor before invoke: req:%v\n", req)
err := next(ctx, req, reply)
fmt.Printf("user first interceptor after invoke: req: %v, reply: %v \n", req, reply)
return err
@@ -66,7 +68,7 @@ func UserFirstInterceptor() primitive.PInterceptor {
func UserSecondInterceptor() primitive.PInterceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
- fmt.Printf("user second interceptor before invoke: req: %v, reply: %v\n", req, reply)
+ fmt.Printf("user second interceptor before invoke: req: %v\n", req)
err := next(ctx, req, reply)
fmt.Printf("user second interceptor after invoke: req: %v, reply: %v \n", req, reply)
return err
diff --git a/internal/kernel/request.go b/internal/kernel/request.go
index 5fe1a44..ad3345b 100644
--- a/internal/kernel/request.go
+++ b/internal/kernel/request.go
@@ -71,6 +71,8 @@ func (request *SendMessageRequest) Encode() map[string]string {
maps["defaultTopic"] = "TBW102"
maps["defaultTopicQueueNums"] = "4"
maps["batch"] = strconv.FormatBool(request.Batch)
+ maps["properties"] = request.Properties
+
return maps
}
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index fbcf5ee..a559a98 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -228,11 +228,12 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
SysFlag: 0,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
- Properties: propertiesToString(msg.Properties),
+ Properties: primitive.MarshalPropeties(msg.Properties),
ReconsumeTimes: 0,
UnitMode: p.options.UnitMode,
Batch: false,
}
+
return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
}
@@ -289,13 +290,3 @@ func (p *defaultProducer) IsUnitMode() bool {
return false
}
-func propertiesToString(properties map[string]string) string {
- if properties == nil {
- return ""
- }
- var str string
- for k, v := range properties {
- str += fmt.Sprintf("%s%v%s%v", k, byte(1), v, byte(2))
- }
- return str
-}
diff --git a/primitive/result.go b/primitive/result.go
index 628f243..217a8f3 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -36,6 +36,9 @@ const (
FlagCompressed = 0x1
MsgIdLength = 8 + 8
+
+ propertySeparator = '\002'
+ nameValueSeparator = '\001'
)
// SendResult RocketMQ send result
@@ -191,10 +194,11 @@ func DecodeMessage(data []byte) []*MessageExt {
msg.Topic = string(buf.Next(int(_byte)))
count += 1 + int(_byte)
+ // 17. properties
var propertiesLength int16
binary.Read(buf, binary.BigEndian, &propertiesLength)
if propertiesLength > 0 {
- msg.Properties = parseProperties(buf.Next(int(propertiesLength)))
+ msg.Properties = unmarshalProperties(buf.Next(int(propertiesLength)))
}
count += 2 + int(propertiesLength)
@@ -211,8 +215,32 @@ func createMessageId(addr []byte, offset int64) string {
return "msgID" // TODO
}
-func parseProperties(data []byte) map[string]string {
- return make(map[string]string, 0)
+// unmarshalProperties parse data into property kv pairs.
+func unmarshalProperties(data []byte) map[string]string {
+ m := make(map[string]string)
+ items := bytes.Split(data, []byte{propertySeparator})
+ for _, item := range items {
+ kv := bytes.Split(item, []byte{nameValueSeparator})
+ if len(kv) == 2 {
+ m[ string(kv[0]) ] = string(kv[1])
+ }
+ }
+ return m
+}
+
+func MarshalPropeties(properties map[string]string) string {
+ if properties == nil {
+ return ""
+ }
+ buffer := bytes.NewBufferString("")
+
+ for k, v := range properties {
+ buffer.WriteString(k)
+ buffer.WriteRune(nameValueSeparator)
+ buffer.WriteString(v)
+ buffer.WriteRune(propertySeparator)
+ }
+ return buffer.String()
}
func toMessages(messageExts []*MessageExt) []*Message {
diff --git a/primitive/result_test.go b/primitive/result_test.go
new file mode 100644
index 0000000..131db8c
--- /dev/null
+++ b/primitive/result_test.go
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test(t *testing.T) {
+ kv := map[string]string{
+ "k1": "v1",
+ "k2": "v2",
+ }
+ str := MarshalPropeties(kv)
+ kv2 := unmarshalProperties([]byte(str))
+ assert.Equal(t, kv, kv2)
+}