You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/30 13:38:30 UTC
[rocketmq-client-go] branch native updated: [ISSUE #180]add tag
filter example. resolve #180 (#181)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 6480b41 [ISSUE #180]add tag filter example. resolve #180 (#181)
6480b41 is described below
commit 6480b41543b40353f97937361e1b8c29c7f674f9
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Fri Aug 30 21:38:26 2019 +0800
[ISSUE #180]add tag filter example. resolve #180 (#181)
---
consumer/consumer.go | 25 ++++++++------
consumer/consumer_test.go | 6 ++--
consumer/mock_offset_store.go | 17 +++++++++
consumer/push_consumer.go | 6 ++--
examples/{producer/delay => consumer/tag}/main.go | 42 ++++++++++-------------
examples/producer/delay/main.go | 2 +-
examples/producer/{delay => tag}/main.go | 8 +++--
go.mod | 1 +
internal/model.go | 14 ++++----
internal/trace.go | 2 +-
internal/utils/set.go | 30 +++++++++++++---
primitive/message.go | 25 ++++++++------
12 files changed, 112 insertions(+), 66 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0564eaf..f7fb759 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -27,13 +27,14 @@ import (
"sync"
"time"
+ "github.com/pkg/errors"
+ "github.com/tidwall/gjson"
+
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/internal/remote"
"github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/pkg/errors"
- "github.com/tidwall/gjson"
)
const (
@@ -276,7 +277,6 @@ func (dc *defaultConsumer) start() error {
dc.subscriptionDataTable.Store(retryTopic, sub)
}
- //dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
if dc.model == Clustering {
dc.option.ChangeInstanceNameToPID()
dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
@@ -799,10 +799,10 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
// filter message according to tags
msgListFilterAgain := msgs
- if len(data.Tags) > 0 && data.ClassFilterMode {
- msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
+ if data.Tags.Len() > 0 && data.ClassFilterMode {
+ msgListFilterAgain = make([]*primitive.MessageExt, 0)
for _, msg := range msgs {
- _, exist := data.Tags[msg.GetTags()]
+ _, exist := data.Tags.Contains(msg.GetTags())
if exist {
msgListFilterAgain = append(msgListFilterAgain, msg)
}
@@ -929,16 +929,19 @@ func buildSubscriptionData(topic string, selector MessageSelector) *internal.Sub
subData.ExpType = string(TAG)
subData.SubString = _SubAll
} else {
- tags := strings.Split(selector.Expression, "\\|\\|")
+ tags := strings.Split(selector.Expression, "||")
+ subData.Tags = utils.NewSet()
+ subData.Codes = utils.NewSet()
for idx := range tags {
trimString := strings.Trim(tags[idx], " ")
if trimString != "" {
- if !subData.Tags[trimString] {
- subData.Tags[trimString] = true
+ if _, ok := subData.Tags.Contains(trimString); !ok {
+ subData.Tags.AddKV(trimString, trimString)
}
hCode := utils.HashString(trimString)
- if !subData.Codes[int32(hCode)] {
- subData.Codes[int32(hCode)] = true
+ v := strconv.Itoa(hCode)
+ if _, ok := subData.Codes.Contains(v); !ok {
+ subData.Codes.AddKV(v, v)
}
}
}
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 665547d..a419dc7 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -70,7 +70,7 @@ func TestDoRebalance(t *testing.T) {
dc.namesrv = namesrvCli
rmqCli := internal.NewMockRMQClient(ctrl)
- rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&remote.RemotingCommand{
Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
}, nil)
@@ -127,7 +127,7 @@ func TestComputePullFromWhere(t *testing.T) {
broker := "a"
namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
- rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&remote.RemotingCommand{
ExtFields: map[string]string{
"offset": "20",
@@ -155,7 +155,7 @@ func TestComputePullFromWhere(t *testing.T) {
broker := "a"
namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
- rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&remote.RemotingCommand{
ExtFields: map[string]string{
"offset": "30",
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index 093c50d..b9fa4cf 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -1,3 +1,20 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
// Code generated by MockGen. DO NOT EDIT.
// Source: offset_store.go
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index dfee161..cef744a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -182,8 +182,8 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
if pc.option.ConsumerModel == Clustering {
// add retry topic for clustering mode
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
- data = buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
- pc.subscriptionDataTable.Store(retryTopic, data)
+ retryData := buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
+ pc.subscriptionDataTable.Store(retryTopic, retryData)
pc.subscribedTopic[retryTopic] = ""
}
@@ -502,7 +502,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
rt := time.Now().Sub(beginTime) / time.Millisecond
increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
- result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+ pc.processPullResult(request.mq, result, sd)
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
diff --git a/examples/producer/delay/main.go b/examples/consumer/tag/main.go
similarity index 58%
copy from examples/producer/delay/main.go
copy to examples/consumer/tag/main.go
index 059507f..4a7cc9c 100644
--- a/examples/producer/delay/main.go
+++ b/examples/consumer/tag/main.go
@@ -21,38 +21,34 @@ import (
"context"
"fmt"
"os"
+ "time"
"github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/primitive"
- "github.com/apache/rocketmq-client-go/producer"
)
func main() {
- p, _ := rocketmq.NewProducer(
- producer.WithNameServer([]string{"127.0.0.1:9876"}),
- producer.WithRetry(2),
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
- err := p.Start()
- if err != nil {
- fmt.Printf("start producer error: %s", err.Error())
- os.Exit(1)
+ selector := consumer.MessageSelector{
+ Type: consumer.TAG,
+ Expression: "TagA || TagC",
}
- for i := 0; i < 10; i++ {
- msg := &primitive.Message{
- Topic: "TopicTest",
- Body: []byte("Hello RocketMQ Go Client!"),
- }
- msg.SetDelayTimeLevel(3)
- res, err := p.SendSync(context.Background(), msg)
-
- if err != nil {
- fmt.Printf("send message error: %s\n", err)
- } else {
- fmt.Printf("send message success: result=%s\n", res.String())
- }
+ err := c.Subscribe("TopicTest", selector, func(ctx context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Printf("subscribe callback: %v \n", msgs)
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
}
- err = p.Shutdown()
+ err = c.Start()
if err != nil {
- fmt.Printf("shundown producer error: %s", err.Error())
+ fmt.Println(err.Error())
+ os.Exit(-1)
}
+ time.Sleep(time.Hour)
}
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
index 059507f..4466e8d 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/delay/main.go
@@ -42,7 +42,7 @@ func main() {
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
}
- msg.SetDelayTimeLevel(3)
+ msg.WithDelayTimeLevel(3)
res, err := p.SendSync(context.Background(), msg)
if err != nil {
diff --git a/examples/producer/delay/main.go b/examples/producer/tag/main.go
similarity index 93%
copy from examples/producer/delay/main.go
copy to examples/producer/tag/main.go
index 059507f..6b22454 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/tag/main.go
@@ -37,14 +37,16 @@ func main() {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
- for i := 0; i < 10; i++ {
+ tags := []string{"TagA", "TagB", "TagC"}
+ for i := 0; i < 3; i++ {
+ tag := tags[i%3]
msg := &primitive.Message{
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
}
- msg.SetDelayTimeLevel(3)
- res, err := p.SendSync(context.Background(), msg)
+ msg.WithTag(tag)
+ res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
diff --git a/go.mod b/go.mod
index 7ccdc85..b2704fa 100644
--- a/go.mod
+++ b/go.mod
@@ -11,5 +11,6 @@ require (
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
+ golang.org/x/tools v0.0.0-20190425150028-36563e24a262
stathat.com/c/consistent v1.0.0
)
diff --git a/internal/model.go b/internal/model.go
index 86b286e..ad86e2e 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -45,13 +45,13 @@ const (
)
type SubscriptionData struct {
- ClassFilterMode bool
- Topic string
- SubString string
- Tags map[string]bool
- Codes map[int32]bool
- SubVersion int64
- ExpType string
+ ClassFilterMode bool `json:"classFilterMode"`
+ Topic string `json:"topic"`
+ SubString string `json:"subString"`
+ Tags utils.Set `json:"tagsSet"`
+ Codes utils.Set `json:"codeSet"`
+ SubVersion int64 `json:"subVersion"`
+ ExpType string `json:"expressionType"`
}
type producerData struct {
diff --git a/internal/trace.go b/internal/trace.go
index 8bc6748..3bceefb 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -389,7 +389,7 @@ func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBea
func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, data string) {
msg := primitive.NewMessage(td.traceTopic, []byte(data))
- msg.SetKeys(keyset.slice())
+ msg.WithKeys(keyset.slice())
mq, addr := td.findMq()
if mq == nil {
diff --git a/internal/utils/set.go b/internal/utils/set.go
index 2f9f214..e90fb36 100644
--- a/internal/utils/set.go
+++ b/internal/utils/set.go
@@ -27,6 +27,12 @@ type UniqueItem interface {
UniqueID() string
}
+type StringUnique string
+
+func (str StringUnique) UniqueID() string {
+ return string(str)
+}
+
type Set struct {
items map[string]UniqueItem
}
@@ -41,6 +47,15 @@ func (s *Set) Add(v UniqueItem) {
s.items[v.UniqueID()] = v
}
+func (s *Set) AddKV(k, v string) {
+ s.items[k] = StringUnique(v)
+}
+
+func (s *Set) Contains(k string) (UniqueItem, bool) {
+ v, ok := s.items[k]
+ return v, ok
+}
+
func (s *Set) Len() int {
return len(s.items)
}
@@ -56,11 +71,18 @@ func (s *Set) MarshalJSON() ([]byte, error) {
buffer.WriteByte('[')
keys := make([]string, 0)
for _, k := range s.items {
- v, err := json.Marshal(k)
- if err != nil {
- return nil, err
+ var key string
+ switch kval := k.(type) {
+ case StringUnique:
+ key = "\"" + string(kval) + "\""
+ default:
+ v, err := json.Marshal(k)
+ if err != nil {
+ return nil, err
+ }
+ key = string(v)
}
- keys = append(keys, string(v))
+ keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
diff --git a/primitive/message.go b/primitive/message.go
index 3dd4a0c..254af06 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -85,14 +85,23 @@ func NewMessage(topic string, body []byte) *Message {
return msg
}
-// SetDelayTimeLevel set message delay time to consume.
+// WithDelayTimeLevel set message delay time to consume.
// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
-func (msg *Message) SetDelayTimeLevel(level int) {
+func (msg *Message) WithDelayTimeLevel(level int) *Message {
if msg.Properties == nil {
msg.Properties = make(map[string]string)
}
msg.Properties[PropertyDelayTimeLevel] = strconv.Itoa(level)
+ return msg
+}
+
+func (msg *Message) WithTag(tags string) *Message {
+ if msg.Properties == nil {
+ msg.Properties = make(map[string]string)
+ }
+ msg.Properties[PropertyTags] = tags
+ return msg
}
func (msg *Message) String() string {
@@ -100,13 +109,9 @@ func (msg *Message) String() string {
msg.Topic, string(msg.Body), msg.Flag, msg.Properties, msg.TransactionId)
}
-//
-//func (msg *Message) SetTags(tags string) {
-// msg.Properties[tags] = tags
-//}
-
-func (msg *Message) PutProperty(key, value string) {
+func (msg *Message) WithProperty(key, value string) *Message {
msg.Properties[key] = value
+ return msg
}
func (msg *Message) RemoveProperty(key string) string {
@@ -119,13 +124,13 @@ func (msg *Message) RemoveProperty(key string) string {
return value
}
-func (msg *Message) SetKeys(keys []string) {
+func (msg *Message) WithKeys(keys []string) *Message {
var sb strings.Builder
for _, k := range keys {
sb.WriteString(k)
sb.WriteString(PropertyKeySeparator)
}
- msg.PutProperty(PropertyKeys, sb.String())
+ return msg.WithProperty(PropertyKeys, sb.String())
}
func (msg *Message) GetTags() string {