You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 11:38:53 UTC

[rocketmq-client-go] branch master updated: [ISSUE #621] Support consume message directly (#622)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c97d492  [ISSUE #621] Support consume message directly (#622)
c97d492 is described below

commit c97d492fa2505361021311f8675464285563c6e5
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Mar 16 19:38:44 2021 +0800

    [ISSUE #621] Support consume message directly (#622)
    
    * Support consume message directly
---
 consumer/push_consumer.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++
 internal/client.go        | 40 +++++++++++++++++++++++++++++++++++
 internal/model.go         | 27 ++++++++++++++++++++++++
 internal/model_test.go    | 42 +++++++++++++++++++++++++++++++++++++
 internal/request.go       | 38 +++++++++++++++++++++++++++++++++
 5 files changed, 200 insertions(+)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index f13d5e2..8fb4637 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -291,6 +291,59 @@ func (pc *pushConsumer) GetWhere() string {
 
 }
 
+func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
+	var msgs = []*primitive.MessageExt{msg}
+	var mq = &primitive.MessageQueue{
+		Topic:      msg.Topic,
+		BrokerName: brokerName,
+		QueueId:    msg.Queue.QueueId,
+	}
+
+	beginTime := time.Now()
+	pc.resetRetryAndNamespace(msgs)
+	var result ConsumeResult
+
+	var err error
+	msgCtx := &primitive.ConsumeMessageContext{
+		Properties:    make(map[string]string),
+		ConsumerGroup: pc.consumerGroup,
+		MQ:            mq,
+		Msgs:          msgs,
+	}
+	ctx := context.Background()
+	ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+	ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
+	concurrentCtx := primitive.NewConsumeConcurrentlyContext()
+	concurrentCtx.MQ = *mq
+	ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
+
+	result, err = pc.consumeInner(ctx, msgs)
+
+	consumeRT := time.Now().Sub(beginTime)
+
+	res := &internal.ConsumeMessageDirectlyResult{
+		Order:          false,
+		AutoCommit:     true,
+		SpentTimeMills: int64(consumeRT / time.Millisecond),
+	}
+
+	if err != nil {
+		msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
+		res.ConsumeResult = internal.ThrowException
+		res.Remark = err.Error()
+	} else if result == ConsumeSuccess {
+		msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
+		res.ConsumeResult = internal.ConsumeSuccess
+	} else if result == ConsumeRetryLater {
+		msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
+		res.ConsumeResult = internal.ConsumeRetryLater
+	}
+
+	increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+
+	return res
+}
+
 func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
 	info := internal.NewConsumerRunningInfo()
 
diff --git a/internal/client.go b/internal/client.go
index e2264a7..bc523ab 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,7 @@ type InnerConsumer interface {
 	Rebalance()
 	IsUnitMode() bool
 	GetConsumerRunningInfo() *ConsumerRunningInfo
+	ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
 	GetcType() string
 	GetModel() string
 	GetWhere() string
@@ -252,6 +253,36 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 			}
 			return res
 		})
+
+		client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+			rlog.Info("receive consume message directly request...", nil)
+			header := new(ConsumeMessageDirectlyHeader)
+			header.Decode(req.ExtFields)
+			val, exist := clientMap.Load(header.clientID)
+			res := remote.NewRemotingCommand(ResError, nil, nil)
+			if !exist {
+				res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
+			} else {
+				cli, ok := val.(*rmqClient)
+				msg := primitive.DecodeMessage(req.Body)[0]
+				var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult
+				if ok {
+					consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
+				}
+				if consumeMessageDirectlyResult != nil {
+					res.Code = ResSuccess
+					data, err := consumeMessageDirectlyResult.Encode()
+					if err != nil {
+						res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
+					} else {
+						res.Body = data
+					}
+				} else {
+					res.Remark = "there is unexpected error when consume message directly, please check log"
+				}
+			}
+			return res
+		})
 	}
 	return actual.(*rmqClient)
 }
@@ -744,6 +775,15 @@ func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
 	return info
 }
 
+func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group string, brokerName string) *ConsumeMessageDirectlyResult {
+	consumer, exist := c.consumerMap.Load(group)
+	if !exist {
+		return nil
+	}
+	res := consumer.(InnerConsumer).ConsumeMessageDirectly(msg, brokerName)
+	return res
+}
+
 func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
 	list := make([]*primitive.MessageQueue, 0)
 	for idx := range data.QueueDataList {
diff --git a/internal/model.go b/internal/model.go
index 0ee9ccc..934c610 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -261,3 +261,30 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
 		StatusTable:      make(map[string]ConsumeStatus),
 	}
 }
+
+type ConsumeMessageDirectlyResult struct {
+	Order          bool          `json:"order"`
+	AutoCommit     bool          `json:"autoCommit"`
+	ConsumeResult  ConsumeResult `json:"consumeResult"`
+	Remark         string        `json:"remark"`
+	SpentTimeMills int64         `json:"spentTimeMills"`
+}
+
+type ConsumeResult int
+
+const (
+	ConsumeSuccess ConsumeResult = iota
+	ConsumeRetryLater
+	Rollback
+	Commit
+	ThrowException
+	ReturnNull
+)
+
+func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
+	data, err := json.Marshal(result)
+	if err != nil {
+		return nil, err
+	}
+	return data, nil
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 0d8dcd7..57ff0af 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -362,3 +362,45 @@ func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
 		})
 	})
 }
+
+func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
+	Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() {
+		Convey("test consume success", func() {
+			consumeMessageDirectlyResult := ConsumeMessageDirectlyResult{
+				Order:          false,
+				AutoCommit:     true,
+				SpentTimeMills: 2,
+			}
+			consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess
+			data, err := consumeMessageDirectlyResult.Encode()
+			So(err, ShouldBeNil)
+			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+		})
+
+		Convey("test consume timeout", func() {
+			consumeResult := ConsumeMessageDirectlyResult{
+				Order:          false,
+				AutoCommit:     true,
+				SpentTimeMills: 2,
+			}
+			consumeResult.ConsumeResult = ReturnNull
+			data, err := consumeResult.Encode()
+			So(err, ShouldBeNil)
+			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+		})
+
+		Convey("test consume exception", func() {
+			consumeResult := ConsumeMessageDirectlyResult{
+				Order:          false,
+				AutoCommit:     true,
+				SpentTimeMills: 5,
+			}
+			consumeResult.ConsumeResult = ThrowException
+			consumeResult.Remark = "Unknown Exception"
+			data, err := consumeResult.Encode()
+			So(err, ShouldBeNil)
+			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+		})
+	})
+
+}
diff --git a/internal/request.go b/internal/request.go
index fa88efe..ed3de33 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -407,3 +407,41 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string {
 
 	return maps
 }
+
+type ConsumeMessageDirectlyHeader struct {
+	consumerGroup string
+	clientID      string
+	msgId         string
+	brokerName    string
+}
+
+func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.consumerGroup
+	maps["clientId"] = request.clientID
+	maps["msgId"] = request.msgId
+	maps["brokerName"] = request.brokerName
+	return maps
+}
+
+func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string) {
+	if len(properties) == 0 {
+		return
+	}
+
+	if v, existed := properties["consumerGroup"]; existed {
+		request.consumerGroup = v
+	}
+
+	if v, existed := properties["clientId"]; existed {
+		request.clientID = v
+	}
+
+	if v, existed := properties["msgId"]; existed {
+		request.msgId = v
+	}
+
+	if v, existed := properties["brokerName"]; existed {
+		request.brokerName = v
+	}
+}