You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/30 13:38:30 UTC

[rocketmq-client-go] branch native updated: [ISSUE #180]add tag filter example. resolve #180 (#181)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 6480b41  [ISSUE #180]add tag filter example. resolve #180 (#181)
6480b41 is described below

commit 6480b41543b40353f97937361e1b8c29c7f674f9
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Fri Aug 30 21:38:26 2019 +0800

    [ISSUE #180]add tag filter example. resolve #180 (#181)
---
 consumer/consumer.go                              | 25 ++++++++------
 consumer/consumer_test.go                         |  6 ++--
 consumer/mock_offset_store.go                     | 17 +++++++++
 consumer/push_consumer.go                         |  6 ++--
 examples/{producer/delay => consumer/tag}/main.go | 42 ++++++++++-------------
 examples/producer/delay/main.go                   |  2 +-
 examples/producer/{delay => tag}/main.go          |  8 +++--
 go.mod                                            |  1 +
 internal/model.go                                 | 14 ++++----
 internal/trace.go                                 |  2 +-
 internal/utils/set.go                             | 30 +++++++++++++---
 primitive/message.go                              | 25 ++++++++------
 12 files changed, 112 insertions(+), 66 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0564eaf..f7fb759 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -27,13 +27,14 @@ import (
 	"sync"
 	"time"
 
+	"github.com/pkg/errors"
+	"github.com/tidwall/gjson"
+
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/internal/remote"
 	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/pkg/errors"
-	"github.com/tidwall/gjson"
 )
 
 const (
@@ -276,7 +277,6 @@ func (dc *defaultConsumer) start() error {
 		dc.subscriptionDataTable.Store(retryTopic, sub)
 	}
 
-	//dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
 	if dc.model == Clustering {
 		dc.option.ChangeInstanceNameToPID()
 		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
@@ -799,10 +799,10 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
 
 		// filter message according to tags
 		msgListFilterAgain := msgs
-		if len(data.Tags) > 0 && data.ClassFilterMode {
-			msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
+		if data.Tags.Len() > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, 0)
 			for _, msg := range msgs {
-				_, exist := data.Tags[msg.GetTags()]
+				_, exist := data.Tags.Contains(msg.GetTags())
 				if exist {
 					msgListFilterAgain = append(msgListFilterAgain, msg)
 				}
@@ -929,16 +929,19 @@ func buildSubscriptionData(topic string, selector MessageSelector) *internal.Sub
 		subData.ExpType = string(TAG)
 		subData.SubString = _SubAll
 	} else {
-		tags := strings.Split(selector.Expression, "\\|\\|")
+		tags := strings.Split(selector.Expression, "||")
+		subData.Tags = utils.NewSet()
+		subData.Codes = utils.NewSet()
 		for idx := range tags {
 			trimString := strings.Trim(tags[idx], " ")
 			if trimString != "" {
-				if !subData.Tags[trimString] {
-					subData.Tags[trimString] = true
+				if _, ok := subData.Tags.Contains(trimString); !ok {
+					subData.Tags.AddKV(trimString, trimString)
 				}
 				hCode := utils.HashString(trimString)
-				if !subData.Codes[int32(hCode)] {
-					subData.Codes[int32(hCode)] = true
+				v := strconv.Itoa(hCode)
+				if _, ok := subData.Codes.Contains(v); !ok {
+					subData.Codes.AddKV(v, v)
 				}
 			}
 		}
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 665547d..a419dc7 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -70,7 +70,7 @@ func TestDoRebalance(t *testing.T) {
 		dc.namesrv = namesrvCli
 
 		rmqCli := internal.NewMockRMQClient(ctrl)
-		rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+		rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
 			Return(&remote.RemotingCommand{
 				Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
 			}, nil)
@@ -127,7 +127,7 @@ func TestComputePullFromWhere(t *testing.T) {
 			broker := "a"
 			namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
 
-			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
 				Return(&remote.RemotingCommand{
 					ExtFields: map[string]string{
 						"offset": "20",
@@ -155,7 +155,7 @@ func TestComputePullFromWhere(t *testing.T) {
 			broker := "a"
 			namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
 
-			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
 				Return(&remote.RemotingCommand{
 					ExtFields: map[string]string{
 						"offset": "30",
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index 093c50d..b9fa4cf 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -1,3 +1,20 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
 // Code generated by MockGen. DO NOT EDIT.
 // Source: offset_store.go
 
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index dfee161..cef744a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -182,8 +182,8 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	if pc.option.ConsumerModel == Clustering {
 		// add retry topic for clustering mode
 		retryTopic := internal.GetRetryTopic(pc.consumerGroup)
-		data = buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
-		pc.subscriptionDataTable.Store(retryTopic, data)
+		retryData := buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
+		pc.subscriptionDataTable.Store(retryTopic, retryData)
 		pc.subscribedTopic[retryTopic] = ""
 	}
 
@@ -502,7 +502,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			rt := time.Now().Sub(beginTime) / time.Millisecond
 			increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
 
-			result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+			pc.processPullResult(request.mq, result, sd)
 
 			msgFounded := result.GetMessageExts()
 			firstMsgOffset := int64(math.MaxInt64)
diff --git a/examples/producer/delay/main.go b/examples/consumer/tag/main.go
similarity index 58%
copy from examples/producer/delay/main.go
copy to examples/consumer/tag/main.go
index 059507f..4a7cc9c 100644
--- a/examples/producer/delay/main.go
+++ b/examples/consumer/tag/main.go
@@ -21,38 +21,34 @@ import (
 	"context"
 	"fmt"
 	"os"
+	"time"
 
 	"github.com/apache/rocketmq-client-go"
+	"github.com/apache/rocketmq-client-go/consumer"
 	"github.com/apache/rocketmq-client-go/primitive"
-	"github.com/apache/rocketmq-client-go/producer"
 )
 
 func main() {
-	p, _ := rocketmq.NewProducer(
-		producer.WithNameServer([]string{"127.0.0.1:9876"}),
-		producer.WithRetry(2),
+	c, _ := rocketmq.NewPushConsumer(
+		consumer.WithGroupName("testGroup"),
+		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
 	)
-	err := p.Start()
-	if err != nil {
-		fmt.Printf("start producer error: %s", err.Error())
-		os.Exit(1)
+	selector := consumer.MessageSelector{
+		Type:       consumer.TAG,
+		Expression: "TagA || TagC",
 	}
-	for i := 0; i < 10; i++ {
-		msg := &primitive.Message{
-			Topic: "TopicTest",
-			Body:  []byte("Hello RocketMQ Go Client!"),
-		}
-		msg.SetDelayTimeLevel(3)
-		res, err := p.SendSync(context.Background(), msg)
-
-		if err != nil {
-			fmt.Printf("send message error: %s\n", err)
-		} else {
-			fmt.Printf("send message success: result=%s\n", res.String())
-		}
+	err := c.Subscribe("TopicTest", selector, func(ctx context.Context,
+		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+		fmt.Printf("subscribe callback: %v \n", msgs)
+		return consumer.ConsumeSuccess, nil
+	})
+	if err != nil {
+		fmt.Println(err.Error())
 	}
-	err = p.Shutdown()
+	err = c.Start()
 	if err != nil {
-		fmt.Printf("shundown producer error: %s", err.Error())
+		fmt.Println(err.Error())
+		os.Exit(-1)
 	}
+	time.Sleep(time.Hour)
 }
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
index 059507f..4466e8d 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/delay/main.go
@@ -42,7 +42,7 @@ func main() {
 			Topic: "TopicTest",
 			Body:  []byte("Hello RocketMQ Go Client!"),
 		}
-		msg.SetDelayTimeLevel(3)
+		msg.WithDelayTimeLevel(3)
 		res, err := p.SendSync(context.Background(), msg)
 
 		if err != nil {
diff --git a/examples/producer/delay/main.go b/examples/producer/tag/main.go
similarity index 93%
copy from examples/producer/delay/main.go
copy to examples/producer/tag/main.go
index 059507f..6b22454 100644
--- a/examples/producer/delay/main.go
+++ b/examples/producer/tag/main.go
@@ -37,14 +37,16 @@ func main() {
 		fmt.Printf("start producer error: %s", err.Error())
 		os.Exit(1)
 	}
-	for i := 0; i < 10; i++ {
+	tags := []string{"TagA", "TagB", "TagC"}
+	for i := 0; i < 3; i++ {
+		tag := tags[i%3]
 		msg := &primitive.Message{
 			Topic: "TopicTest",
 			Body:  []byte("Hello RocketMQ Go Client!"),
 		}
-		msg.SetDelayTimeLevel(3)
-		res, err := p.SendSync(context.Background(), msg)
+		msg.WithTag(tag)
 
+		res, err := p.SendSync(context.Background(), msg)
 		if err != nil {
 			fmt.Printf("send message error: %s\n", err)
 		} else {
diff --git a/go.mod b/go.mod
index 7ccdc85..b2704fa 100644
--- a/go.mod
+++ b/go.mod
@@ -11,5 +11,6 @@ require (
 	github.com/tidwall/gjson v1.2.1
 	github.com/tidwall/match v1.0.1 // indirect
 	github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
+	golang.org/x/tools v0.0.0-20190425150028-36563e24a262
 	stathat.com/c/consistent v1.0.0
 )
diff --git a/internal/model.go b/internal/model.go
index 86b286e..ad86e2e 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -45,13 +45,13 @@ const (
 )
 
 type SubscriptionData struct {
-	ClassFilterMode bool
-	Topic           string
-	SubString       string
-	Tags            map[string]bool
-	Codes           map[int32]bool
-	SubVersion      int64
-	ExpType         string
+	ClassFilterMode bool      `json:"classFilterMode"`
+	Topic           string    `json:"topic"`
+	SubString       string    `json:"subString"`
+	Tags            utils.Set `json:"tagsSet"`
+	Codes           utils.Set `json:"codeSet"`
+	SubVersion      int64     `json:"subVersion"`
+	ExpType         string    `json:"expressionType"`
 }
 
 type producerData struct {
diff --git a/internal/trace.go b/internal/trace.go
index 8bc6748..3bceefb 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -389,7 +389,7 @@ func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBea
 
 func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, data string) {
 	msg := primitive.NewMessage(td.traceTopic, []byte(data))
-	msg.SetKeys(keyset.slice())
+	msg.WithKeys(keyset.slice())
 
 	mq, addr := td.findMq()
 	if mq == nil {
diff --git a/internal/utils/set.go b/internal/utils/set.go
index 2f9f214..e90fb36 100644
--- a/internal/utils/set.go
+++ b/internal/utils/set.go
@@ -27,6 +27,12 @@ type UniqueItem interface {
 	UniqueID() string
 }
 
+type StringUnique string
+
+func (str StringUnique) UniqueID() string {
+	return string(str)
+}
+
 type Set struct {
 	items map[string]UniqueItem
 }
@@ -41,6 +47,15 @@ func (s *Set) Add(v UniqueItem) {
 	s.items[v.UniqueID()] = v
 }
 
+func (s *Set) AddKV(k, v string) {
+	s.items[k] = StringUnique(v)
+}
+
+func (s *Set) Contains(k string) (UniqueItem, bool) {
+	v, ok := s.items[k]
+	return v, ok
+}
+
 func (s *Set) Len() int {
 	return len(s.items)
 }
@@ -56,11 +71,18 @@ func (s *Set) MarshalJSON() ([]byte, error) {
 	buffer.WriteByte('[')
 	keys := make([]string, 0)
 	for _, k := range s.items {
-		v, err := json.Marshal(k)
-		if err != nil {
-			return nil, err
+		var key string
+		switch kval := k.(type) {
+		case StringUnique:
+			key = "\"" + string(kval) + "\""
+		default:
+			v, err := json.Marshal(k)
+			if err != nil {
+				return nil, err
+			}
+			key = string(v)
 		}
-		keys = append(keys, string(v))
+		keys = append(keys, key)
 	}
 	sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
 
diff --git a/primitive/message.go b/primitive/message.go
index 3dd4a0c..254af06 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -85,14 +85,23 @@ func NewMessage(topic string, body []byte) *Message {
 	return msg
 }
 
-// SetDelayTimeLevel set message delay time to consume.
+// WithDelayTimeLevel set message delay time to consume.
 // reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 // delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
-func (msg *Message) SetDelayTimeLevel(level int) {
+func (msg *Message) WithDelayTimeLevel(level int) *Message {
 	if msg.Properties == nil {
 		msg.Properties = make(map[string]string)
 	}
 	msg.Properties[PropertyDelayTimeLevel] = strconv.Itoa(level)
+	return msg
+}
+
+func (msg *Message) WithTag(tags string) *Message {
+	if msg.Properties == nil {
+		msg.Properties = make(map[string]string)
+	}
+	msg.Properties[PropertyTags] = tags
+	return msg
 }
 
 func (msg *Message) String() string {
@@ -100,13 +109,9 @@ func (msg *Message) String() string {
 		msg.Topic, string(msg.Body), msg.Flag, msg.Properties, msg.TransactionId)
 }
 
-//
-//func (msg *Message) SetTags(tags string) {
-//	msg.Properties[tags] = tags
-//}
-
-func (msg *Message) PutProperty(key, value string) {
+func (msg *Message) WithProperty(key, value string) *Message {
 	msg.Properties[key] = value
+	return msg
 }
 
 func (msg *Message) RemoveProperty(key string) string {
@@ -119,13 +124,13 @@ func (msg *Message) RemoveProperty(key string) string {
 	return value
 }
 
-func (msg *Message) SetKeys(keys []string) {
+func (msg *Message) WithKeys(keys []string) *Message {
 	var sb strings.Builder
 	for _, k := range keys {
 		sb.WriteString(k)
 		sb.WriteString(PropertyKeySeparator)
 	}
-	msg.PutProperty(PropertyKeys, sb.String())
+	return msg.WithProperty(PropertyKeys, sb.String())
 }
 
 func (msg *Message) GetTags() string {