You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/06 13:30:41 UTC
[rocketmq-client-go] branch native updated: [ISSUE #86] Add
Interceptor for producer and consumer. (#85)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 3a3f93b [ISSUE #86] Add Interceptor for producer and consumer. (#85)
3a3f93b is described below
commit 3a3f93bf5d18d8e680146c50e76af730e72595fc
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Sat Jul 6 21:30:36 2019 +0800
[ISSUE #86] Add Interceptor for producer and consumer. (#85)
* add interceptor
* add log
* add producer example
* refactor code according to new version
* refactor code according to new version
* add example
* fix nil bug
* delete extra code
* delete test code
* add comment. resolves #86
* rename
* rename
* stash
* stash
* fix bug
* stash
* refactor consumer interceptor
* add example
* add example
* 重构interceptor
* fix typo
* add ctx key
* remove extra code
* add ctx to conusme
* refactor consumer interceptor
* refactor consumer interceptor
* refactor consumer interceptor
* lower case chainInterceptor
* rename println
---
examples/consumer/interceptor/main.go | 76 +++++++++++++++++
examples/consumer/{ => simple}/main.go | 10 +--
examples/producer/{ => interceptor}/main.go | 32 +++++--
examples/producer/{ => simple}/main.go | 8 +-
internal/consumer/consumer.go | 2 +-
internal/consumer/pull_consumer.go | 4 +-
internal/consumer/push_consumer.go | 124 +++++++++++++++++++---------
internal/kernel/client.go | 56 +++++++------
internal/producer/producer.go | 101 ++++++++++++++++++----
primitive/consume.go | 16 +++-
primitive/ctx.go | 60 ++++++++++++++
primitive/interceptor.go | 57 +++++++++++++
primitive/options.go | 99 +++++++++++++++++++++-
primitive/result.go | 18 +++-
14 files changed, 556 insertions(+), 107 deletions(-)
diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
new file mode 100644
index 0000000..01fedc3
--- /dev/null
+++ b/examples/consumer/interceptor/main.go
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/internal/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+ primitive.WithConsumerModel(primitive.Clustering),
+ primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
+ primitive.WithChainConsumerInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
+ err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
+ msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
+ fmt.Println("subscribe callback: %v", msgs)
+ return primitive.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
+
+func UserFistInterceptor() primitive.CInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker) error {
+ msgCtx, _ := primitive.GetConsumerCtx(ctx)
+ fmt.Printf("msgCtx: %v, mehtod: %s", msgCtx, primitive.GetMethod(ctx))
+
+ msgs := req.([]*primitive.MessageExt)
+ fmt.Printf("user first interceptor before invoke: %v\n", msgs)
+ e := next(ctx, msgs, reply)
+
+ holder := reply.(*primitive.ConsumeResultHolder)
+ fmt.Printf("user first interceptor after invoke: %v, result: %v\n", msgs, holder)
+ return e
+ }
+}
+
+func UserSecondInterceptor() primitive.CInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker) error {
+ msgs := req.([]*primitive.MessageExt)
+ fmt.Printf("user second interceptor before invoke: %v\n", msgs)
+ e := next(ctx, msgs, reply)
+ holder := reply.(*primitive.ConsumeResultHolder)
+ fmt.Printf("user second interceptor after invoke: %v, result: %v\n", msgs, holder)
+ return e
+ }
+}
diff --git a/examples/consumer/main.go b/examples/consumer/simple/main.go
similarity index 79%
rename from examples/consumer/main.go
rename to examples/consumer/simple/main.go
index f433f73..70bbbd4 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,14 +27,10 @@ import (
)
func main() {
- c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
- NameServerAddr: "127.0.0.1:9876",
- ConsumerModel: primitive.Clustering,
- FromWhere: primitive.ConsumeFromFirstOffset,
- })
- err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+ c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+ err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
- fmt.Println(msgs)
+ fmt.Println("subscribe callback: %v", msgs)
return primitive.ConsumeSuccess, nil
})
if err != nil {
diff --git a/examples/producer/main.go b/examples/producer/interceptor/main.go
similarity index 56%
copy from examples/producer/main.go
copy to examples/producer/interceptor/main.go
index b52b266..f7bcf7a 100644
--- a/examples/producer/main.go
+++ b/examples/producer/interceptor/main.go
@@ -15,6 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
+// Package main implements a producer with user custom interceptor.
package main
import (
@@ -27,19 +28,18 @@ import (
)
func main() {
- opt := primitive.ProducerOptions{
- NameServerAddr: "127.0.0.1:9876",
- RetryTimesWhenSendFailed: 2,
- }
- p, _ := producer.NewProducer(opt)
+ nameServerAddr := "127.0.0.1:9876"
+ p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
+ primitive.WithChainProducerInterceptor(UserFirstInterceptor(), UserSecondInterceptor()))
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
- for i := 0; i < 1000; i++ {
+ for i := 0; i < 10; i++ {
res, err := p.SendSync(context.Background(), &primitive.Message{
- Topic: "test",
+ //Topic: "test",
+ Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
})
@@ -54,3 +54,21 @@ func main() {
fmt.Printf("shundown producer error: %s", err.Error())
}
}
+
+func UserFirstInterceptor() primitive.PInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
+ fmt.Printf("user first interceptor before invoke: req:%v, reply: %v\n", req, reply)
+ err := next(ctx, req, reply)
+ fmt.Printf("user first interceptor after invoke: req: %v, reply: %v \n", req, reply)
+ return err
+ }
+}
+
+func UserSecondInterceptor() primitive.PInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
+ fmt.Printf("user second interceptor before invoke: req: %v, reply: %v\n", req, reply)
+ err := next(ctx, req, reply)
+ fmt.Printf("user second interceptor after invoke: req: %v, reply: %v \n", req, reply)
+ return err
+ }
+}
diff --git a/examples/producer/main.go b/examples/producer/simple/main.go
similarity index 89%
rename from examples/producer/main.go
rename to examples/producer/simple/main.go
index b52b266..39c885d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/simple/main.go
@@ -26,12 +26,10 @@ import (
"github.com/apache/rocketmq-client-go/primitive"
)
+// Package main implements a simple producer to send message.
func main() {
- opt := primitive.ProducerOptions{
- NameServerAddr: "127.0.0.1:9876",
- RetryTimesWhenSendFailed: 2,
- }
- p, _ := producer.NewProducer(opt)
+ nameServerAddr := "127.0.0.1:9876"
+ p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go
index e9aa1b0..ecbbbb4 100644
--- a/internal/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -99,7 +99,7 @@ type defaultConsumer struct {
state kernel.ServiceState
pause bool
once sync.Once
- option primitive.ConsumerOption
+ option primitive.ConsumerOptions
// key: int, hash(*primitive.MessageQueue)
// value: *processQueue
processQueueTable sync.Map
diff --git a/internal/consumer/pull_consumer.go b/internal/consumer/pull_consumer.go
index ead633f..2338199 100644
--- a/internal/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -38,7 +38,7 @@ var (
queueCounterTable sync.Map
)
-func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOptions) *defaultPullConsumer {
return &defaultPullConsumer{
option: config,
}
@@ -46,7 +46,7 @@ func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
type defaultPullConsumer struct {
state kernel.ServiceState
- option primitive.ConsumerOption
+ option primitive.ConsumerOptions
client *kernel.RMQClient
GroupName string
Model primitive.MessageModel
diff --git a/internal/consumer/push_consumer.go b/internal/consumer/push_consumer.go
index be839bc..8acbc4c 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -49,46 +49,51 @@ type PushConsumer interface {
Start() error
Shutdown()
Subscribe(topic string, selector primitive.MessageSelector,
- f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
+ f func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
}
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
- consume func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
+ consume func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
+
+ interceptor primitive.CInterceptor
}
-func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushConsumer, error) {
- if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewPushConsumer(consumerGroup string, nameServerAddr string, opts ...*primitive.ConsumerOption) (PushConsumer, error) {
+ if err := utils.VerifyIP(nameServerAddr); err != nil {
return nil, err
}
- opt.InstanceName = "DEFAULT"
- opt.ClientIP = utils.LocalIP()
- if opt.NameServerAddr == "" {
- rlog.Fatal("opt.NameServerAddr can't be empty")
+ if nameServerAddr == "" {
+ rlog.Fatal("opts.NameServerAddr can't be empty")
}
- err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+ err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
if err != nil {
rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
}
+
+ pushOpts := primitive.DefaultPushConsumerOptions()
+ for _, op := range opts {
+ op.Apply(&pushOpts)
+ }
+
+ pushOpts.NameServerAddr = nameServerAddr
+
dc := &defaultConsumer{
consumerGroup: consumerGroup,
cType: _PushConsume,
state: kernel.StateCreateJust,
prCh: make(chan PullRequest, 4),
- model: opt.ConsumerModel,
- consumeOrderly: opt.ConsumeOrderly,
- fromWhere: opt.FromWhere,
- option: opt,
+ model: pushOpts.ConsumerModel,
+ consumeOrderly: pushOpts.ConsumeOrderly,
+ fromWhere: pushOpts.FromWhere,
+ allocate: pushOpts.Strategy,
+ option: pushOpts,
}
- if opt.Strategy == nil {
- opt.Strategy = primitive.AllocateByAveragely
- }
- dc.allocate = opt.Strategy
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
@@ -99,9 +104,37 @@ func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushCo
} else {
p.submitToConsume = p.consumeMessageCurrently
}
+
+ chainInterceptor(p)
+
return p, nil
}
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *pushConsumer) {
+ interceptors := p.option.Interceptors
+ switch len(interceptors) {
+ case 0:
+ p.interceptor = nil
+ case 1:
+ p.interceptor = interceptors[0]
+ default:
+ p.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.CInvoker) error {
+ return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, 0, invoker))
+ }
+ }
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.CInterceptor, cur int, finalInvoker primitive.CInvoker) primitive.CInvoker {
+ if cur == len(interceptors)-1 {
+ return finalInvoker
+ }
+ return func(ctx context.Context, req, reply interface{}) error {
+ return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
+ }
+}
+
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
@@ -164,7 +197,7 @@ func (pc *pushConsumer) Start() error {
func (pc *pushConsumer) Shutdown() {}
func (pc *pushConsumer) Subscribe(topic string, selector primitive.MessageSelector,
- f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
+ f func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
if pc.state != kernel.StateCreateJust {
return errors.New("subscribe topic only started before")
}
@@ -424,6 +457,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
+
result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warnf("pull message from %s error: %s", brokerResult.BrokerAddr, err.Error())
@@ -447,6 +481,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
rt := time.Now().Sub(beginTime)
increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+ result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {
@@ -485,7 +521,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
// TODO
}
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *primitive.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *primitive.ConsumeMessageContext, msg *primitive.MessageExt) bool {
return true
}
@@ -570,16 +606,6 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue
return true
}
-type ConsumeMessageContext struct {
- consumerGroup string
- msgs []*primitive.MessageExt
- mq *primitive.MessageQueue
- success bool
- status string
- // mqTractContext
- properties map[string]string
-}
-
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
@@ -603,8 +629,8 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
return
}
- ctx := &ConsumeMessageContext{
- properties: make(map[string]string),
+ msgCtx := &primitive.ConsumeMessageContext{
+ Properties: make(map[string]string),
}
// TODO hook
beginTime := time.Now()
@@ -620,16 +646,40 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
beginTime.UnixNano()/int64(time.Millisecond), 10)
}
}
- result, err := pc.consume(ctx, subMsgs)
+ var result primitive.ConsumeResult
+
+ var err error
+ if pc.interceptor == nil {
+ result, err = pc.consume(msgCtx, subMsgs)
+ } else {
+ var container primitive.ConsumeResultHolder
+
+ ctx := context.Background()
+ ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+ ctx = primitive.WithMehod(ctx, primitive.ConsumerPush)
+
+ err = pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
+ consumerCtx, _ := primitive.GetConsumerCtx(ctx)
+
+ msgs := req.([]*primitive.MessageExt)
+ r, e := pc.consume(consumerCtx, msgs)
+
+ realReply := reply.(*primitive.ConsumeResultHolder)
+ realReply.ConsumeResult = r
+ return e
+ })
+ result = container.ConsumeResult
+ }
+
consumeRT := time.Now().Sub(beginTime)
if err != nil {
- ctx.properties["ConsumeContextType"] = "EXCEPTION"
+ msgCtx.Properties["ConsumeContextType"] = "EXCEPTION"
} else if consumeRT >= pc.option.ConsumeTimeout {
- ctx.properties["ConsumeContextType"] = "TIMEOUT"
+ msgCtx.Properties["ConsumeContextType"] = "TIMEOUT"
} else if result == primitive.ConsumeSuccess {
- ctx.properties["ConsumeContextType"] = "SUCCESS"
+ msgCtx.Properties["ConsumeContextType"] = "SUCCESS"
} else {
- ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
+ msgCtx.Properties["ConsumeContextType"] = "RECONSUME_LATER"
}
// TODO hook
@@ -648,7 +698,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
} else {
for i := 0; i < len(msgs); i++ {
msg := msgs[i]
- if !pc.sendMessageBack(ctx, msg) {
+ if !pc.sendMessageBack(msgCtx, msg) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 4687989..94f45c9 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -292,7 +292,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r
return nil, err
}
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
var status primitive.SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
@@ -321,20 +321,20 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
- return &primitive.SendResult{
- Status: status,
- MsgID: cmd.ExtFields["msgId"],
- OffsetMsgID: cmd.ExtFields["msgId"],
- MessageQueue: &primitive.MessageQueue{
- Topic: msgs[0].Topic,
- BrokerName: brokerName,
- QueueId: qId,
- },
- QueueOffset: off,
- //TransactionID: sendResponse.TransactionId,
- RegionID: regionId,
- TraceOn: trace != "" && trace != _TranceOff,
+
+ resp.Status = status
+ resp.MsgID = cmd.ExtFields["msgId"]
+ resp.OffsetMsgID = cmd.ExtFields["msgId"]
+ resp.MessageQueue = &primitive.MessageQueue{
+ Topic: msgs[0].Topic,
+ BrokerName: brokerName,
+ QueueId: qId,
}
+ resp.QueueOffset = off
+ //TransactionID: sendResponse.TransactionId,
+ resp.RegionID = regionId
+ resp.TraceOn = trace != "" && trace != _TranceOff
+
}
// PullMessage with sync
@@ -349,6 +349,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request
}
func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) {
+
pullResult := &primitive.PullResult{}
switch response.Code {
case ResSuccess:
@@ -363,29 +364,32 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*prim
return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
}
- v, exist := response.ExtFields["maxOffset"]
+ c.decodeCommandCustomHeader(pullResult, response)
+ pullResult.SetBody(response.Body)
+
+ return pullResult, nil
+}
+
+func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand) {
+ v, exist := cmd.ExtFields["maxOffset"]
if exist {
- pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
+ pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
}
- v, exist = response.ExtFields["minOffset"]
+ v, exist = cmd.ExtFields["minOffset"]
if exist {
- pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
+ pr.MinOffset, _ = strconv.ParseInt(v, 10, 64)
}
- v, exist = response.ExtFields["nextBeginOffset"]
+ v, exist = cmd.ExtFields["nextBeginOffset"]
if exist {
- pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
+ pr.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
}
- v, exist = response.ExtFields["suggestWhichBrokerId"]
+ v, exist = cmd.ExtFields["suggestWhichBrokerId"]
if exist {
- pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
+ pr.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
}
-
- //pullResult.messageExts = decodeMessage(response.Body) TODO parse in top
-
- return pullResult, nil
}
// PullMessageAsync pull message async
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 91bd389..fbcf5ee 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -40,25 +40,59 @@ type Producer interface {
SendOneWay(context.Context, *primitive.Message) error
}
-func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
- if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption) (Producer, error) {
+ if err := utils.VerifyIP(nameServerAddr); err != nil {
return nil, err
}
- if opt.RetryTimesWhenSendFailed == 0 {
- opt.RetryTimesWhenSendFailed = 2
- }
- if opt.NameServerAddr == "" {
- rlog.Fatal("opt.NameServerAddr can't be empty")
+
+ if nameServerAddr == "" {
+ rlog.Fatal("nameServerAddr can't be empty")
}
- err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+ err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
if err != nil {
rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
}
- return &defaultProducer{
+
+ popts := primitive.DefaultProducerOptions()
+ for _, opt := range opts {
+ opt.Apply(&popts)
+ }
+ popts.NameServerAddr = nameServerAddr
+
+ producer := &defaultProducer{
group: "default",
- client: kernel.GetOrNewRocketMQClient(opt.ClientOption),
- options: opt,
- }, nil
+ client: kernel.GetOrNewRocketMQClient(popts.ClientOption),
+ options: popts,
+ }
+
+ chainInterceptor(producer)
+
+ return producer, nil
+}
+
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *defaultProducer) {
+ interceptors := p.options.Interceptors
+ switch len(interceptors) {
+ case 0:
+ p.interceptor = nil
+ case 1:
+ p.interceptor = interceptors[0]
+ default:
+ p.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.PInvoker) error {
+ return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, 0, invoker))
+ }
+ }
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.PInterceptor, cur int, finalInvoker primitive.PInvoker) primitive.PInvoker {
+ if cur == len(interceptors)-1 {
+ return finalInvoker
+ }
+ return func(ctx context.Context, req, reply interface{}) error {
+ return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
+ }
}
type defaultProducer struct {
@@ -67,6 +101,8 @@ type defaultProducer struct {
state kernel.ServiceState
options primitive.ProducerOptions
publishInfo sync.Map
+
+ interceptor primitive.PInterceptor
}
func (p *defaultProducer) Start() error {
@@ -89,11 +125,31 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
return nil, errors.New("topic is nil")
}
+ resp := new(primitive.SendResult)
+ if p.interceptor != nil {
+ primitive.WithMehod(ctx, primitive.SendSync)
+ err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {
+ var err error
+ realReq := req.(*primitive.Message)
+ realReply := reply.(*primitive.SendResult)
+ err = p.sendSync(ctx, realReq, realReply)
+ return err
+ })
+ return resp, err
+ }
+
+ p.sendSync(ctx, msg, resp)
+ return resp, nil
+}
+
+func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {
+
retryTime := 1 + p.options.RetryTimesWhenSendFailed
var (
err error
)
+
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg.Topic)
if mq == nil {
@@ -103,7 +159,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
addr := kernel.FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
- return nil, fmt.Errorf("topic=%s route info not found", mq.Topic)
+ return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, msg), 3*time.Second)
@@ -111,9 +167,10 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
err = _err
continue
}
- return p.client.ProcessSendResponse(mq.BrokerName, res, msg), nil
+ p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
+ return nil
}
- return nil, err
+ return err
}
func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
@@ -125,6 +182,17 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
return errors.New("topic is nil")
}
+ if p.interceptor != nil {
+ primitive.WithMehod(ctx, primitive.SendOneway)
+ return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
+ return p.SendOneWay(ctx, msg)
+ })
+ }
+
+ return p.sendOneWay(ctx, msg)
+}
+
+func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
retryTime := 1 + p.options.RetryTimesWhenSendFailed
var (
@@ -151,7 +219,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
return err
}
-func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg *primitive.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
+ msg *primitive.Message) *remote.RemotingCommand {
req := &kernel.SendMessageRequest{
ProducerGroup: p.group,
Topic: mq.Topic,
diff --git a/primitive/consume.go b/primitive/consume.go
index 40e89b9..5edd461 100644
--- a/primitive/consume.go
+++ b/primitive/consume.go
@@ -4,7 +4,7 @@ package primitive
// </p>
//
// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+// the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load
// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
// separately.
// </p>
@@ -126,3 +126,17 @@ const (
ConsumeSuccess ConsumeResult = iota
ConsumeRetryLater
)
+
+type ConsumeMessageContext struct {
+ ConsumerGroup string
+ Msgs []*MessageExt
+ MQ *MessageQueue
+ Success bool
+ Status string
+ // mqTractContext
+ Properties map[string]string
+}
+
+type ConsumeResultHolder struct {
+ ConsumeResult
+}
\ No newline at end of file
diff --git a/primitive/ctx.go b/primitive/ctx.go
new file mode 100644
index 0000000..e74f91a
--- /dev/null
+++ b/primitive/ctx.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+ * Define the ctx key and value type.
+ */
+package primitive
+
+import "context"
+
+type CtxKey int
+
+const (
+ method CtxKey = iota
+ msgCtx
+
+
+ // method name in producer
+ SendSync = "SendSync"
+ SendOneway = "SendOneway"
+ // method name in consumer
+ ConsumerPush = "ConsumerPush"
+ ConsumerPull = "ConsumerPull"
+)
+
+// WithMehod set call method name
+func WithMehod(ctx context.Context, m string) context.Context {
+ return context.WithValue(ctx, method, m)
+}
+
+// GetMethod get call method name
+func GetMethod(ctx context.Context) string {
+ return ctx.Value(method).(string)
+}
+
+// WithConsumerCtx set ConsumeMessageContext in PushConsumer
+func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context {
+ return context.WithValue(ctx, msgCtx, c)
+}
+
+// GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate
+// whether exist.
+func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool) {
+ c, exist := ctx.Value(msgCtx).(*ConsumeMessageContext)
+ return c, exist
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
new file mode 100644
index 0000000..cb86d19
--- /dev/null
+++ b/primitive/interceptor.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+ "context"
+)
+
+// PInvoker finish a send invoke on producer.
+type PInvoker func(ctx context.Context, req, reply interface{}) error
+
+// PInterceptor intercepts the execution of a send invoke on producer.
+type PInterceptor func(ctx context.Context, req, reply interface{}, next PInvoker) error
+
+// RetryInterceptor retry when send failed.
+func RetryPInterceptor() PInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+ return nil
+ }
+}
+
+// TimeoutInterceptor add a timeout listener in case of operation timeout.
+func TimeoutPInterceptor() PInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+ return nil
+ }
+}
+
+// LogInterceptor log a send invoke.
+func LogPInterceptor() PInterceptor {
+ return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+ return nil
+ }
+}
+
+// CInvoker finish a message invoke on consumer. In PushConsumer call, the req is []*MessageExt type and the reply is *ConsumeResultHolder,
+// use type assert to get real type.
+type CInvoker func(ctx context.Context, req , reply interface{}) error
+
+// CInterceptor intercepts the invoke of a consume on messages. In PushConsumer call, the req is []*MessageExt type and the reply is *ConsumeResultHolder,
+// use type assert to get real type.
+type CInterceptor func(ctx context.Context, req, reply interface{}, next CInvoker) error
diff --git a/primitive/options.go b/primitive/options.go
index b657ac0..0e40415 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -22,9 +22,13 @@ import (
"os"
"strconv"
"time"
+
+ "github.com/apache/rocketmq-client-go/utils"
)
type ProducerOptions struct {
+ Interceptors []PInterceptor
+
ClientOption
NameServerAddr string
GroupName string
@@ -32,7 +36,48 @@ type ProducerOptions struct {
UnitMode bool
}
-type ConsumerOption struct {
+func DefaultProducerOptions() ProducerOptions {
+ return ProducerOptions{
+ RetryTimesWhenSendFailed: 2,
+ }
+}
+
+// ProducerOption configures how we create the producer by set ProducerOptions value.
+type ProducerOption struct {
+ Apply func(*ProducerOptions)
+}
+
+func NewProducerOption(f func(options *ProducerOptions)) *ProducerOption {
+ return &ProducerOption{
+ Apply: f,
+ }
+}
+
+// WithProducerInterceptor returns a ProducerOption that specifies the interceptor for producer.
+func WithProducerInterceptor(f PInterceptor) *ProducerOption {
+ return NewProducerOption(func(options *ProducerOptions) {
+ options.Interceptors = append(options.Interceptors, f)
+ })
+}
+
+// WithChainProducerInterceptor returns a ProducerOption that specifies the chained interceptor for producer.
+// The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper
+// around the real call.
+func WithChainProducerInterceptor(fs ...PInterceptor) *ProducerOption {
+ return NewProducerOption(func(options *ProducerOptions) {
+ options.Interceptors = append(options.Interceptors, fs...)
+ })
+}
+
+// WithRetry return a ProducerOption that specifies the retry times when send failed.
+// TODO: use retryMiddleeware instead.
+func WithRetry(retries int) *ProducerOption {
+ return NewProducerOption(func(options *ProducerOptions) {
+ options.RetryTimesWhenSendFailed = retries
+ })
+}
+
+type ConsumerOptions struct {
ClientOption
NameServerAddr string
@@ -92,7 +137,7 @@ type ConsumerOption struct {
// Max re-consume times. -1 means 16 times.
//
- // If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+ // If messages are re-consumed more than {@link #maxReconsumeTimes} before Success, it's be directed to a deletion
// queue waiting.
MaxReconsumeTimes int
@@ -107,6 +152,56 @@ type ConsumerOption struct {
ConsumeOrderly bool
FromWhere ConsumeFromWhere
// TODO traceDispatcher
+
+ Interceptors []CInterceptor
+}
+
+func DefaultPushConsumerOptions() ConsumerOptions{
+ return ConsumerOptions{
+ ClientOption: ClientOption{
+ InstanceName: "DEFAULT",
+ ClientIP: utils.LocalIP(),
+ },
+ Strategy: AllocateByAveragely,
+ }
+}
+
+type ConsumerOption struct {
+ Apply func(*ConsumerOptions)
+}
+
+func NewConsumerOption(f func(*ConsumerOptions)) *ConsumerOption {
+ return &ConsumerOption{
+ Apply: f,
+ }
+}
+
+func WithConsumerModel(m MessageModel) *ConsumerOption {
+ return NewConsumerOption(func(options *ConsumerOptions) {
+ options.ConsumerModel = m
+ })
+}
+
+func WithConsumeFromWhere(w ConsumeFromWhere) *ConsumerOption{
+ return NewConsumerOption(func(options *ConsumerOptions) {
+ options.FromWhere = w
+ })
+}
+
+// WithConsumerInterceptor returns a ConsumerOption that specifies the interceptor for consumer.
+func WithConsumerInterceptor(f CInterceptor) *ConsumerOption {
+ return NewConsumerOption(func(options *ConsumerOptions) {
+ options.Interceptors = append(options.Interceptors, f)
+ })
+}
+
+// WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer.
+// The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper
+// around the real call.
+func WithChainConsumerInterceptor(fs ...CInterceptor) *ConsumerOption {
+ return NewConsumerOption(func(options *ConsumerOptions) {
+ options.Interceptors = append(options.Interceptors, fs...)
+ })
}
func (opt *ClientOption) ChangeInstanceNameToPID() {
diff --git a/primitive/result.go b/primitive/result.go
index c37358c..628f243 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -56,10 +56,10 @@ func (result *SendResult) String() string {
result.Status, result.MsgID, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
}
-// PullStatus pull status
+// PullStatus pull Status
type PullStatus int
-// predefined pull status
+// predefined pull Status
const (
PullFound PullStatus = iota
PullNoNewMsg
@@ -75,7 +75,11 @@ type PullResult struct {
MaxOffset int64
Status PullStatus
SuggestWhichBrokerId int64
+
+ // messageExts message info
messageExts []*MessageExt
+ //
+ body []byte
}
func (result *PullResult) GetMessageExts() []*MessageExt {
@@ -93,11 +97,19 @@ func (result *PullResult) GetMessages() []*Message {
return toMessages(result.messageExts)
}
+func (result *PullResult) SetBody(data []byte) {
+ result.body = data
+}
+
+func (result *PullResult) GetBody() []byte {
+ return result.body
+}
+
func (result *PullResult) String() string {
return ""
}
-func decodeMessage(data []byte) []*MessageExt {
+func DecodeMessage(data []byte) []*MessageExt {
msgs := make([]*MessageExt, 0)
buf := bytes.NewBuffer(data)
count := 0