You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/06 13:30:41 UTC

[rocketmq-client-go] branch native updated: [ISSUE #86] Add Interceptor for producer and consumer. (#85)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 3a3f93b  [ISSUE #86] Add Interceptor for producer and consumer. (#85)
3a3f93b is described below

commit 3a3f93bf5d18d8e680146c50e76af730e72595fc
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Sat Jul 6 21:30:36 2019 +0800

    [ISSUE #86] Add Interceptor for producer and consumer. (#85)
    
    * add interceptor
    
    * add log
    
    * add producer example
    
    * refactor code according to new version
    
    * refactor code according to new version
    
    * add example
    
    * fix nil bug
    
    * delete extra code
    
    * delete test code
    
    * add comment. resolves #86
    
    * rename
    
    * rename
    
    * stash
    
    * stash
    
    * fix  bug
    
    * stash
    
    * refactor consumer interceptor
    
    * add example
    
    * add example
    
    * 重构interceptor
    
    * fix typo
    
    * add ctx key
    
    * remove extra code
    
    * add ctx to conusme
    
    * refactor consumer interceptor
    
    * refactor consumer interceptor
    
    * refactor consumer interceptor
    
    * lower case chainInterceptor
    
    * rename println
---
 examples/consumer/interceptor/main.go       |  76 +++++++++++++++++
 examples/consumer/{ => simple}/main.go      |  10 +--
 examples/producer/{ => interceptor}/main.go |  32 +++++--
 examples/producer/{ => simple}/main.go      |   8 +-
 internal/consumer/consumer.go               |   2 +-
 internal/consumer/pull_consumer.go          |   4 +-
 internal/consumer/push_consumer.go          | 124 +++++++++++++++++++---------
 internal/kernel/client.go                   |  56 +++++++------
 internal/producer/producer.go               | 101 ++++++++++++++++++----
 primitive/consume.go                        |  16 +++-
 primitive/ctx.go                            |  60 ++++++++++++++
 primitive/interceptor.go                    |  57 +++++++++++++
 primitive/options.go                        |  99 +++++++++++++++++++++-
 primitive/result.go                         |  18 +++-
 14 files changed, 556 insertions(+), 107 deletions(-)

diff --git a/examples/consumer/interceptor/main.go b/examples/consumer/interceptor/main.go
new file mode 100644
index 0000000..01fedc3
--- /dev/null
+++ b/examples/consumer/interceptor/main.go
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/internal/consumer"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+	c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+		primitive.WithConsumerModel(primitive.Clustering),
+		primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
+		primitive.WithChainConsumerInterceptor(UserFistInterceptor(), UserSecondInterceptor()))
+	err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
+		msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
+		fmt.Println("subscribe callback: %v", msgs)
+		return primitive.ConsumeSuccess, nil
+	})
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	// Note: start after subscribe
+	err = c.Start()
+	if err != nil {
+		fmt.Println(err.Error())
+		os.Exit(-1)
+	}
+	time.Sleep(time.Hour)
+}
+
+func UserFistInterceptor() primitive.CInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker) error {
+		msgCtx, _ := primitive.GetConsumerCtx(ctx)
+		fmt.Printf("msgCtx: %v, mehtod: %s", msgCtx, primitive.GetMethod(ctx))
+
+		msgs := req.([]*primitive.MessageExt)
+		fmt.Printf("user first interceptor before invoke: %v\n", msgs)
+		e := next(ctx, msgs, reply)
+
+		holder := reply.(*primitive.ConsumeResultHolder)
+		fmt.Printf("user first interceptor after invoke: %v, result: %v\n", msgs, holder)
+		return e
+	}
+}
+
+func UserSecondInterceptor() primitive.CInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next primitive.CInvoker)  error {
+		msgs := req.([]*primitive.MessageExt)
+		fmt.Printf("user second interceptor before invoke: %v\n", msgs)
+		e := next(ctx, msgs, reply)
+		holder := reply.(*primitive.ConsumeResultHolder)
+		fmt.Printf("user second interceptor after invoke: %v, result: %v\n", msgs, holder)
+		return e
+	}
+}
diff --git a/examples/consumer/main.go b/examples/consumer/simple/main.go
similarity index 79%
rename from examples/consumer/main.go
rename to examples/consumer/simple/main.go
index f433f73..70bbbd4 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,14 +27,10 @@ import (
 )
 
 func main() {
-	c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
-		NameServerAddr: "127.0.0.1:9876",
-		ConsumerModel:  primitive.Clustering,
-		FromWhere:      primitive.ConsumeFromFirstOffset,
-	})
-	err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+	c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+	err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx *primitive.ConsumeMessageContext,
 		msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
-		fmt.Println(msgs)
+		fmt.Println("subscribe callback: %v", msgs)
 		return primitive.ConsumeSuccess, nil
 	})
 	if err != nil {
diff --git a/examples/producer/main.go b/examples/producer/interceptor/main.go
similarity index 56%
copy from examples/producer/main.go
copy to examples/producer/interceptor/main.go
index b52b266..f7bcf7a 100644
--- a/examples/producer/main.go
+++ b/examples/producer/interceptor/main.go
@@ -15,6 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
+// Package main implements a producer with user custom interceptor.
 package main
 
 import (
@@ -27,19 +28,18 @@ import (
 )
 
 func main() {
-	opt := primitive.ProducerOptions{
-		NameServerAddr:           "127.0.0.1:9876",
-		RetryTimesWhenSendFailed: 2,
-	}
-	p, _ := producer.NewProducer(opt)
+	nameServerAddr := "127.0.0.1:9876"
+	p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
+		primitive.WithChainProducerInterceptor(UserFirstInterceptor(), UserSecondInterceptor()))
 	err := p.Start()
 	if err != nil {
 		fmt.Printf("start producer error: %s", err.Error())
 		os.Exit(1)
 	}
-	for i := 0; i < 1000; i++ {
+	for i := 0; i < 10; i++ {
 		res, err := p.SendSync(context.Background(), &primitive.Message{
-			Topic: "test",
+			//Topic: "test",
+			Topic: "TopicTest",
 			Body:  []byte("Hello RocketMQ Go Client!"),
 		})
 
@@ -54,3 +54,21 @@ func main() {
 		fmt.Printf("shundown producer error: %s", err.Error())
 	}
 }
+
+func UserFirstInterceptor() primitive.PInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
+		fmt.Printf("user first interceptor before invoke: req:%v, reply: %v\n", req, reply)
+		err := next(ctx, req, reply)
+		fmt.Printf("user first interceptor after invoke: req: %v, reply: %v \n", req, reply)
+		return err
+	}
+}
+
+func UserSecondInterceptor() primitive.PInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next primitive.PInvoker) error {
+		fmt.Printf("user second interceptor before invoke: req: %v, reply: %v\n", req, reply)
+		err := next(ctx, req, reply)
+		fmt.Printf("user second interceptor after invoke: req: %v, reply: %v \n", req, reply)
+		return err
+	}
+}
diff --git a/examples/producer/main.go b/examples/producer/simple/main.go
similarity index 89%
rename from examples/producer/main.go
rename to examples/producer/simple/main.go
index b52b266..39c885d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/simple/main.go
@@ -26,12 +26,10 @@ import (
 	"github.com/apache/rocketmq-client-go/primitive"
 )
 
+// Package main implements a simple producer to send message.
 func main() {
-	opt := primitive.ProducerOptions{
-		NameServerAddr:           "127.0.0.1:9876",
-		RetryTimesWhenSendFailed: 2,
-	}
-	p, _ := producer.NewProducer(opt)
+	nameServerAddr := "127.0.0.1:9876"
+	p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
 	err := p.Start()
 	if err != nil {
 		fmt.Printf("start producer error: %s", err.Error())
diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go
index e9aa1b0..ecbbbb4 100644
--- a/internal/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -99,7 +99,7 @@ type defaultConsumer struct {
 	state     kernel.ServiceState
 	pause     bool
 	once      sync.Once
-	option    primitive.ConsumerOption
+	option    primitive.ConsumerOptions
 	// key: int, hash(*primitive.MessageQueue)
 	// value: *processQueue
 	processQueueTable sync.Map
diff --git a/internal/consumer/pull_consumer.go b/internal/consumer/pull_consumer.go
index ead633f..2338199 100644
--- a/internal/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -38,7 +38,7 @@ var (
 	queueCounterTable sync.Map
 )
 
-func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOptions) *defaultPullConsumer {
 	return &defaultPullConsumer{
 		option: config,
 	}
@@ -46,7 +46,7 @@ func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
 
 type defaultPullConsumer struct {
 	state     kernel.ServiceState
-	option    primitive.ConsumerOption
+	option    primitive.ConsumerOptions
 	client    *kernel.RMQClient
 	GroupName string
 	Model     primitive.MessageModel
diff --git a/internal/consumer/push_consumer.go b/internal/consumer/push_consumer.go
index be839bc..8acbc4c 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -49,46 +49,51 @@ type PushConsumer interface {
 	Start() error
 	Shutdown()
 	Subscribe(topic string, selector primitive.MessageSelector,
-		f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
+		f func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
 }
 
 type pushConsumer struct {
 	*defaultConsumer
 	queueFlowControlTimes        int
 	queueMaxSpanFlowControlTimes int
-	consume                      func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
+	consume                      func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
 	submitToConsume              func(*processQueue, *primitive.MessageQueue)
 	subscribedTopic              map[string]string
+
+	interceptor primitive.CInterceptor
 }
 
-func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushConsumer, error) {
-	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewPushConsumer(consumerGroup string, nameServerAddr string, opts ...*primitive.ConsumerOption) (PushConsumer, error) {
+	if err := utils.VerifyIP(nameServerAddr); err != nil {
 		return nil, err
 	}
-	opt.InstanceName = "DEFAULT"
-	opt.ClientIP = utils.LocalIP()
-	if opt.NameServerAddr == "" {
-		rlog.Fatal("opt.NameServerAddr can't be empty")
+	if nameServerAddr == "" {
+		rlog.Fatal("opts.NameServerAddr can't be empty")
 	}
-	err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+	err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
 	if err != nil {
 		rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
 	}
+
+	pushOpts := primitive.DefaultPushConsumerOptions()
+	for _, op := range opts {
+		op.Apply(&pushOpts)
+	}
+
+	pushOpts.NameServerAddr = nameServerAddr
+
 	dc := &defaultConsumer{
 		consumerGroup:  consumerGroup,
 		cType:          _PushConsume,
 		state:          kernel.StateCreateJust,
 		prCh:           make(chan PullRequest, 4),
-		model:          opt.ConsumerModel,
-		consumeOrderly: opt.ConsumeOrderly,
-		fromWhere:      opt.FromWhere,
-		option:         opt,
+		model:          pushOpts.ConsumerModel,
+		consumeOrderly: pushOpts.ConsumeOrderly,
+		fromWhere:      pushOpts.FromWhere,
+		allocate:       pushOpts.Strategy,
+		option:         pushOpts,
 	}
 
-	if opt.Strategy == nil {
-		opt.Strategy = primitive.AllocateByAveragely
-	}
-	dc.allocate = opt.Strategy
 	p := &pushConsumer{
 		defaultConsumer: dc,
 		subscribedTopic: make(map[string]string, 0),
@@ -99,9 +104,37 @@ func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushCo
 	} else {
 		p.submitToConsume = p.consumeMessageCurrently
 	}
+
+	chainInterceptor(p)
+
 	return p, nil
 }
 
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *pushConsumer) {
+	interceptors := p.option.Interceptors
+	switch len(interceptors) {
+	case 0:
+		p.interceptor = nil
+	case 1:
+		p.interceptor = interceptors[0]
+	default:
+		p.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.CInvoker) error {
+			return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, 0, invoker))
+		}
+	}
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.CInterceptor, cur int, finalInvoker primitive.CInvoker) primitive.CInvoker {
+	if cur == len(interceptors)-1 {
+		return finalInvoker
+	}
+	return func(ctx context.Context, req, reply interface{}) error {
+		return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
+	}
+}
+
 func (pc *pushConsumer) Start() error {
 	var err error
 	pc.once.Do(func() {
@@ -164,7 +197,7 @@ func (pc *pushConsumer) Start() error {
 func (pc *pushConsumer) Shutdown() {}
 
 func (pc *pushConsumer) Subscribe(topic string, selector primitive.MessageSelector,
-	f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
+	f func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
 	if pc.state != kernel.StateCreateJust {
 		return errors.New("subscribe topic only started before")
 	}
@@ -424,6 +457,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			sleepTime = _PullDelayTimeWhenError
 			goto NEXT
 		}
+
 		result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
 		if err != nil {
 			rlog.Warnf("pull message from %s error: %s", brokerResult.BrokerAddr, err.Error())
@@ -447,6 +481,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			rt := time.Now().Sub(beginTime)
 			increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
 
+			result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+
 			msgFounded := result.GetMessageExts()
 			firstMsgOffset := int64(math.MaxInt64)
 			if msgFounded != nil && len(msgFounded) != 0 {
@@ -485,7 +521,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
 	// TODO
 }
 
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *primitive.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *primitive.ConsumeMessageContext, msg *primitive.MessageExt) bool {
 	return true
 }
 
@@ -570,16 +606,6 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue
 	return true
 }
 
-type ConsumeMessageContext struct {
-	consumerGroup string
-	msgs          []*primitive.MessageExt
-	mq            *primitive.MessageQueue
-	success       bool
-	status        string
-	// mqTractContext
-	properties map[string]string
-}
-
 func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
 	msgs := pq.getMessages()
 	if msgs == nil {
@@ -603,8 +629,8 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 				return
 			}
 
-			ctx := &ConsumeMessageContext{
-				properties: make(map[string]string),
+			msgCtx := &primitive.ConsumeMessageContext{
+				Properties: make(map[string]string),
 			}
 			// TODO hook
 			beginTime := time.Now()
@@ -620,16 +646,40 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 						beginTime.UnixNano()/int64(time.Millisecond), 10)
 				}
 			}
-			result, err := pc.consume(ctx, subMsgs)
+			var result primitive.ConsumeResult
+
+			var err error
+			if pc.interceptor == nil {
+				result, err = pc.consume(msgCtx, subMsgs)
+			} else {
+				var container primitive.ConsumeResultHolder
+
+				ctx := context.Background()
+				ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+				ctx = primitive.WithMehod(ctx, primitive.ConsumerPush)
+
+				err = pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
+					consumerCtx, _ := primitive.GetConsumerCtx(ctx)
+
+					msgs := req.([]*primitive.MessageExt)
+					r, e := pc.consume(consumerCtx, msgs)
+
+					realReply := reply.(*primitive.ConsumeResultHolder)
+					realReply.ConsumeResult = r
+					return e
+				})
+				result = container.ConsumeResult
+			}
+
 			consumeRT := time.Now().Sub(beginTime)
 			if err != nil {
-				ctx.properties["ConsumeContextType"] = "EXCEPTION"
+				msgCtx.Properties["ConsumeContextType"] = "EXCEPTION"
 			} else if consumeRT >= pc.option.ConsumeTimeout {
-				ctx.properties["ConsumeContextType"] = "TIMEOUT"
+				msgCtx.Properties["ConsumeContextType"] = "TIMEOUT"
 			} else if result == primitive.ConsumeSuccess {
-				ctx.properties["ConsumeContextType"] = "SUCCESS"
+				msgCtx.Properties["ConsumeContextType"] = "SUCCESS"
 			} else {
-				ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
+				msgCtx.Properties["ConsumeContextType"] = "RECONSUME_LATER"
 			}
 
 			// TODO hook
@@ -648,7 +698,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 					} else {
 						for i := 0; i < len(msgs); i++ {
 							msg := msgs[i]
-							if !pc.sendMessageBack(ctx, msg) {
+							if !pc.sendMessageBack(msgCtx, msg) {
 								msg.ReconsumeTimes += 1
 								msgBackFailed = append(msgBackFailed, msg)
 							}
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 4687989..94f45c9 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -292,7 +292,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r
 	return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
 	var status primitive.SendStatus
 	switch cmd.Code {
 	case ResFlushDiskTimeout:
@@ -321,20 +321,20 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 
 	qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
 	off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
-	return &primitive.SendResult{
-		Status:      status,
-		MsgID:       cmd.ExtFields["msgId"],
-		OffsetMsgID: cmd.ExtFields["msgId"],
-		MessageQueue: &primitive.MessageQueue{
-			Topic:      msgs[0].Topic,
-			BrokerName: brokerName,
-			QueueId:    qId,
-		},
-		QueueOffset: off,
-		//TransactionID: sendResponse.TransactionId,
-		RegionID: regionId,
-		TraceOn:  trace != "" && trace != _TranceOff,
+
+	resp.Status = status
+	resp.MsgID = cmd.ExtFields["msgId"]
+	resp.OffsetMsgID = cmd.ExtFields["msgId"]
+	resp.MessageQueue = &primitive.MessageQueue{
+		Topic:      msgs[0].Topic,
+		BrokerName: brokerName,
+		QueueId:    qId,
 	}
+	resp.QueueOffset = off
+	//TransactionID: sendResponse.TransactionId,
+	resp.RegionID = regionId
+	resp.TraceOn = trace != "" && trace != _TranceOff
+
 }
 
 // PullMessage with sync
@@ -349,6 +349,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request
 }
 
 func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) {
+
 	pullResult := &primitive.PullResult{}
 	switch response.Code {
 	case ResSuccess:
@@ -363,29 +364,32 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*prim
 		return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
 	}
 
-	v, exist := response.ExtFields["maxOffset"]
+	c.decodeCommandCustomHeader(pullResult, response)
+	pullResult.SetBody(response.Body)
+
+	return pullResult, nil
+}
+
+func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand) {
+	v, exist := cmd.ExtFields["maxOffset"]
 	if exist {
-		pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
+		pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
 	}
 
-	v, exist = response.ExtFields["minOffset"]
+	v, exist = cmd.ExtFields["minOffset"]
 	if exist {
-		pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
+		pr.MinOffset, _ = strconv.ParseInt(v, 10, 64)
 	}
 
-	v, exist = response.ExtFields["nextBeginOffset"]
+	v, exist = cmd.ExtFields["nextBeginOffset"]
 	if exist {
-		pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
+		pr.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
 	}
 
-	v, exist = response.ExtFields["suggestWhichBrokerId"]
+	v, exist = cmd.ExtFields["suggestWhichBrokerId"]
 	if exist {
-		pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
+		pr.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
 	}
-
-	//pullResult.messageExts = decodeMessage(response.Body) TODO parse in top
-
-	return pullResult, nil
 }
 
 // PullMessageAsync pull message async
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 91bd389..fbcf5ee 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -40,25 +40,59 @@ type Producer interface {
 	SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
-	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption) (Producer, error) {
+	if err := utils.VerifyIP(nameServerAddr); err != nil {
 		return nil, err
 	}
-	if opt.RetryTimesWhenSendFailed == 0 {
-		opt.RetryTimesWhenSendFailed = 2
-	}
-	if opt.NameServerAddr == "" {
-		rlog.Fatal("opt.NameServerAddr can't be empty")
+
+	if nameServerAddr == "" {
+		rlog.Fatal("nameServerAddr can't be empty")
 	}
-	err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+	err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
 	if err != nil {
 		rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
 	}
-	return &defaultProducer{
+
+	popts := primitive.DefaultProducerOptions()
+	for _, opt := range opts {
+		opt.Apply(&popts)
+	}
+	popts.NameServerAddr = nameServerAddr
+
+	producer := &defaultProducer{
 		group:   "default",
-		client:  kernel.GetOrNewRocketMQClient(opt.ClientOption),
-		options: opt,
-	}, nil
+		client:  kernel.GetOrNewRocketMQClient(popts.ClientOption),
+		options: popts,
+	}
+
+	chainInterceptor(producer)
+
+	return producer, nil
+}
+
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *defaultProducer) {
+	interceptors := p.options.Interceptors
+	switch len(interceptors) {
+	case 0:
+		p.interceptor = nil
+	case 1:
+		p.interceptor = interceptors[0]
+	default:
+		p.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.PInvoker) error {
+			return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, 0, invoker))
+		}
+	}
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.PInterceptor, cur int, finalInvoker primitive.PInvoker) primitive.PInvoker {
+	if cur == len(interceptors)-1 {
+		return finalInvoker
+	}
+	return func(ctx context.Context, req, reply interface{}) error {
+		return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
+	}
 }
 
 type defaultProducer struct {
@@ -67,6 +101,8 @@ type defaultProducer struct {
 	state       kernel.ServiceState
 	options     primitive.ProducerOptions
 	publishInfo sync.Map
+
+	interceptor primitive.PInterceptor
 }
 
 func (p *defaultProducer) Start() error {
@@ -89,11 +125,31 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 		return nil, errors.New("topic is nil")
 	}
 
+	resp := new(primitive.SendResult)
+	if p.interceptor != nil {
+		primitive.WithMehod(ctx, primitive.SendSync)
+		err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {
+			var err error
+			realReq := req.(*primitive.Message)
+			realReply := reply.(*primitive.SendResult)
+			err = p.sendSync(ctx, realReq, realReply)
+			return err
+		})
+		return resp, err
+	}
+
+	p.sendSync(ctx, msg, resp)
+	return resp, nil
+}
+
+func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {
+
 	retryTime := 1 + p.options.RetryTimesWhenSendFailed
 
 	var (
 		err error
 	)
+
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg.Topic)
 		if mq == nil {
@@ -103,7 +159,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 
 		addr := kernel.FindBrokerAddrByName(mq.BrokerName)
 		if addr == "" {
-			return nil, fmt.Errorf("topic=%s route info not found", mq.Topic)
+			return fmt.Errorf("topic=%s route info not found", mq.Topic)
 		}
 
 		res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, msg), 3*time.Second)
@@ -111,9 +167,10 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 			err = _err
 			continue
 		}
-		return p.client.ProcessSendResponse(mq.BrokerName, res, msg), nil
+		p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
+		return nil
 	}
-	return nil, err
+	return err
 }
 
 func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
@@ -125,6 +182,17 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
 		return errors.New("topic is nil")
 	}
 
+	if p.interceptor != nil {
+		primitive.WithMehod(ctx, primitive.SendOneway)
+		return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
+			return p.SendOneWay(ctx, msg)
+		})
+	}
+
+	return p.sendOneWay(ctx, msg)
+}
+
+func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
 	retryTime := 1 + p.options.RetryTimesWhenSendFailed
 
 	var (
@@ -151,7 +219,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
 	return err
 }
 
-func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg *primitive.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
+	msg *primitive.Message) *remote.RemotingCommand {
 	req := &kernel.SendMessageRequest{
 		ProducerGroup:  p.group,
 		Topic:          mq.Topic,
diff --git a/primitive/consume.go b/primitive/consume.go
index 40e89b9..5edd461 100644
--- a/primitive/consume.go
+++ b/primitive/consume.go
@@ -4,7 +4,7 @@ package primitive
 // </p>
 //
 // RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+// the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load
 // balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
 // separately.
 // </p>
@@ -126,3 +126,17 @@ const (
 	ConsumeSuccess ConsumeResult = iota
 	ConsumeRetryLater
 )
+
+type ConsumeMessageContext struct {
+	ConsumerGroup string
+	Msgs          []*MessageExt
+	MQ            *MessageQueue
+	Success       bool
+	Status        string
+	// mqTractContext
+	Properties map[string]string
+}
+
+type ConsumeResultHolder struct {
+	ConsumeResult
+}
\ No newline at end of file
diff --git a/primitive/ctx.go b/primitive/ctx.go
new file mode 100644
index 0000000..e74f91a
--- /dev/null
+++ b/primitive/ctx.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+ * Define the ctx key and value type.
+ */
+package primitive
+
+import "context"
+
+type CtxKey int
+
+const (
+	method CtxKey = iota
+	msgCtx
+
+
+	// method name in  producer
+	SendSync = "SendSync"
+	SendOneway = "SendOneway"
+	// method name in consumer
+	ConsumerPush = "ConsumerPush"
+	ConsumerPull = "ConsumerPull"
+)
+
+// WithMehod set call method name
+func WithMehod(ctx context.Context, m string) context.Context {
+	return context.WithValue(ctx, method, m)
+}
+
+// GetMethod get call method name
+func GetMethod(ctx context.Context) string {
+	return ctx.Value(method).(string)
+}
+
+// WithConsumerCtx set ConsumeMessageContext in PushConsumer
+func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context {
+	return context.WithValue(ctx, msgCtx, c)
+}
+
+// GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate
+// whether exist.
+func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool) {
+	c, exist := ctx.Value(msgCtx).(*ConsumeMessageContext)
+	return c, exist
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
new file mode 100644
index 0000000..cb86d19
--- /dev/null
+++ b/primitive/interceptor.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+	"context"
+)
+
+// PInvoker finish a send invoke on producer.
+type PInvoker func(ctx context.Context, req, reply interface{}) error
+
+// PInterceptor intercepts the execution of a send invoke on producer.
+type PInterceptor func(ctx context.Context, req, reply interface{}, next PInvoker) error
+
+// RetryInterceptor retry when send failed.
+func RetryPInterceptor() PInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+		return nil
+	}
+}
+
+// TimeoutInterceptor add a timeout listener in case of operation timeout.
+func TimeoutPInterceptor() PInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+		return nil
+	}
+}
+
+// LogInterceptor log a send invoke.
+func LogPInterceptor() PInterceptor {
+	return func(ctx context.Context, req, reply interface{}, next PInvoker) error {
+		return nil
+	}
+}
+
+// CInvoker finish a message invoke on consumer. In PushConsumer call, the req is []*MessageExt type and the reply is *ConsumeResultHolder,
+// use type assert to get real type.
+type CInvoker func(ctx context.Context, req , reply interface{}) error
+
+// CInterceptor intercepts the invoke of a consume on messages. In PushConsumer call, the req is []*MessageExt type and the reply is *ConsumeResultHolder,
+// use type assert to get real type.
+type CInterceptor func(ctx context.Context, req, reply interface{}, next CInvoker) error
diff --git a/primitive/options.go b/primitive/options.go
index b657ac0..0e40415 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -22,9 +22,13 @@ import (
 	"os"
 	"strconv"
 	"time"
+
+	"github.com/apache/rocketmq-client-go/utils"
 )
 
 type ProducerOptions struct {
+	Interceptors []PInterceptor
+
 	ClientOption
 	NameServerAddr           string
 	GroupName                string
@@ -32,7 +36,48 @@ type ProducerOptions struct {
 	UnitMode                 bool
 }
 
-type ConsumerOption struct {
+func DefaultProducerOptions() ProducerOptions {
+	return ProducerOptions{
+		RetryTimesWhenSendFailed:  2,
+	}
+}
+
+// ProducerOption configures how we create the producer by set ProducerOptions value.
+type ProducerOption struct {
+	Apply func(*ProducerOptions)
+}
+
+func NewProducerOption(f func(options *ProducerOptions)) *ProducerOption {
+	return &ProducerOption{
+		Apply: f,
+	}
+}
+
+// WithProducerInterceptor returns a ProducerOption that specifies the interceptor for producer.
+func WithProducerInterceptor(f PInterceptor) *ProducerOption {
+	return NewProducerOption(func(options *ProducerOptions) {
+		options.Interceptors = append(options.Interceptors, f)
+	})
+}
+
+// WithChainProducerInterceptor returns a ProducerOption that specifies the chained interceptor for producer.
+// The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper
+// around the real call.
+func WithChainProducerInterceptor(fs ...PInterceptor) *ProducerOption {
+	return NewProducerOption(func(options *ProducerOptions) {
+		options.Interceptors = append(options.Interceptors, fs...)
+	})
+}
+
+// WithRetry return a ProducerOption that specifies the retry times when send failed.
+// TODO: use retryMiddleeware instead.
+func WithRetry(retries int) *ProducerOption {
+	return  NewProducerOption(func(options *ProducerOptions) {
+		options.RetryTimesWhenSendFailed = retries
+	})
+}
+
+type ConsumerOptions struct {
 	ClientOption
 	NameServerAddr string
 
@@ -92,7 +137,7 @@ type ConsumerOption struct {
 
 	// Max re-consume times. -1 means 16 times.
 	//
-	// If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+	// If messages are re-consumed more than {@link #maxReconsumeTimes} before Success, it's be directed to a deletion
 	// queue waiting.
 	MaxReconsumeTimes int
 
@@ -107,6 +152,56 @@ type ConsumerOption struct {
 	ConsumeOrderly bool
 	FromWhere      ConsumeFromWhere
 	// TODO traceDispatcher
+
+	Interceptors []CInterceptor
+}
+
+func DefaultPushConsumerOptions() ConsumerOptions{
+	return ConsumerOptions{
+		ClientOption: ClientOption{
+			InstanceName: "DEFAULT",
+			ClientIP: utils.LocalIP(),
+		},
+		Strategy: AllocateByAveragely,
+	}
+}
+
+type ConsumerOption struct {
+	Apply func(*ConsumerOptions)
+}
+
+func NewConsumerOption(f func(*ConsumerOptions)) *ConsumerOption {
+	return &ConsumerOption{
+		Apply: f,
+	}
+}
+
+func WithConsumerModel(m MessageModel) *ConsumerOption {
+	return NewConsumerOption(func(options *ConsumerOptions) {
+		options.ConsumerModel = m
+	})
+}
+
+func WithConsumeFromWhere(w ConsumeFromWhere) *ConsumerOption{
+	return NewConsumerOption(func(options *ConsumerOptions) {
+		options.FromWhere = w
+	})
+}
+
+// WithConsumerInterceptor returns a ConsumerOption that specifies the interceptor for consumer.
+func WithConsumerInterceptor(f CInterceptor) *ConsumerOption {
+	return NewConsumerOption(func(options *ConsumerOptions) {
+		options.Interceptors = append(options.Interceptors, f)
+	})
+}
+
+// WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer.
+// The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper
+// around the real call.
+func WithChainConsumerInterceptor(fs ...CInterceptor) *ConsumerOption {
+	return NewConsumerOption(func(options *ConsumerOptions) {
+		options.Interceptors = append(options.Interceptors, fs...)
+	})
 }
 
 func (opt *ClientOption) ChangeInstanceNameToPID() {
diff --git a/primitive/result.go b/primitive/result.go
index c37358c..628f243 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -56,10 +56,10 @@ func (result *SendResult) String() string {
 		result.Status, result.MsgID, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
 }
 
-// PullStatus pull status
+// PullStatus pull Status
 type PullStatus int
 
-// predefined pull status
+// predefined pull Status
 const (
 	PullFound PullStatus = iota
 	PullNoNewMsg
@@ -75,7 +75,11 @@ type PullResult struct {
 	MaxOffset            int64
 	Status               PullStatus
 	SuggestWhichBrokerId int64
+
+	// messageExts message info
 	messageExts          []*MessageExt
+	//
+	body []byte
 }
 
 func (result *PullResult) GetMessageExts() []*MessageExt {
@@ -93,11 +97,19 @@ func (result *PullResult) GetMessages() []*Message {
 	return toMessages(result.messageExts)
 }
 
+func (result *PullResult) SetBody(data []byte) {
+	result.body = data
+}
+
+func (result *PullResult) GetBody() []byte {
+	return result.body
+}
+
 func (result *PullResult) String() string {
 	return ""
 }
 
-func decodeMessage(data []byte) []*MessageExt {
+func DecodeMessage(data []byte) []*MessageExt {
 	msgs := make([]*MessageExt, 0)
 	buf := bytes.NewBuffer(data)
 	count := 0