You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 11:38:53 UTC
[rocketmq-client-go] branch master updated: [ISSUE #621] Support
consume message directly (#622)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c97d492 [ISSUE #621] Support consume message directly (#622)
c97d492 is described below
commit c97d492fa2505361021311f8675464285563c6e5
Author: 张旭 <ma...@gmail.com>
AuthorDate: Tue Mar 16 19:38:44 2021 +0800
[ISSUE #621] Support consume message directly (#622)
* Support consume message directly
---
consumer/push_consumer.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++
internal/client.go | 40 +++++++++++++++++++++++++++++++++++
internal/model.go | 27 ++++++++++++++++++++++++
internal/model_test.go | 42 +++++++++++++++++++++++++++++++++++++
internal/request.go | 38 +++++++++++++++++++++++++++++++++
5 files changed, 200 insertions(+)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index f13d5e2..8fb4637 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -291,6 +291,59 @@ func (pc *pushConsumer) GetWhere() string {
}
+func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
+ var msgs = []*primitive.MessageExt{msg}
+ var mq = &primitive.MessageQueue{
+ Topic: msg.Topic,
+ BrokerName: brokerName,
+ QueueId: msg.Queue.QueueId,
+ }
+
+ beginTime := time.Now()
+ pc.resetRetryAndNamespace(msgs)
+ var result ConsumeResult
+
+ var err error
+ msgCtx := &primitive.ConsumeMessageContext{
+ Properties: make(map[string]string),
+ ConsumerGroup: pc.consumerGroup,
+ MQ: mq,
+ Msgs: msgs,
+ }
+ ctx := context.Background()
+ ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+ ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
+ concurrentCtx := primitive.NewConsumeConcurrentlyContext()
+ concurrentCtx.MQ = *mq
+ ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
+
+ result, err = pc.consumeInner(ctx, msgs)
+
+ consumeRT := time.Now().Sub(beginTime)
+
+ res := &internal.ConsumeMessageDirectlyResult{
+ Order: false,
+ AutoCommit: true,
+ SpentTimeMills: int64(consumeRT / time.Millisecond),
+ }
+
+ if err != nil {
+ msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
+ res.ConsumeResult = internal.ThrowException
+ res.Remark = err.Error()
+ } else if result == ConsumeSuccess {
+ msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
+ res.ConsumeResult = internal.ConsumeSuccess
+ } else if result == ConsumeRetryLater {
+ msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
+ res.ConsumeResult = internal.ConsumeRetryLater
+ }
+
+ increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+
+ return res
+}
+
func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()
diff --git a/internal/client.go b/internal/client.go
index e2264a7..bc523ab 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,7 @@ type InnerConsumer interface {
Rebalance()
IsUnitMode() bool
GetConsumerRunningInfo() *ConsumerRunningInfo
+ ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
GetcType() string
GetModel() string
GetWhere() string
@@ -252,6 +253,36 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
}
return res
})
+
+ client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+ rlog.Info("receive consume message directly request...", nil)
+ header := new(ConsumeMessageDirectlyHeader)
+ header.Decode(req.ExtFields)
+ val, exist := clientMap.Load(header.clientID)
+ res := remote.NewRemotingCommand(ResError, nil, nil)
+ if !exist {
+ res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
+ } else {
+ cli, ok := val.(*rmqClient)
+ msg := primitive.DecodeMessage(req.Body)[0]
+ var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult
+ if ok {
+ consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
+ }
+ if consumeMessageDirectlyResult != nil {
+ res.Code = ResSuccess
+ data, err := consumeMessageDirectlyResult.Encode()
+ if err != nil {
+ res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
+ } else {
+ res.Body = data
+ }
+ } else {
+ res.Remark = "there is unexpected error when consume message directly, please check log"
+ }
+ }
+ return res
+ })
}
return actual.(*rmqClient)
}
@@ -744,6 +775,15 @@ func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
return info
}
+func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group string, brokerName string) *ConsumeMessageDirectlyResult {
+ consumer, exist := c.consumerMap.Load(group)
+ if !exist {
+ return nil
+ }
+ res := consumer.(InnerConsumer).ConsumeMessageDirectly(msg, brokerName)
+ return res
+}
+
func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
list := make([]*primitive.MessageQueue, 0)
for idx := range data.QueueDataList {
diff --git a/internal/model.go b/internal/model.go
index 0ee9ccc..934c610 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -261,3 +261,30 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
StatusTable: make(map[string]ConsumeStatus),
}
}
+
+type ConsumeMessageDirectlyResult struct {
+ Order bool `json:"order"`
+ AutoCommit bool `json:"autoCommit"`
+ ConsumeResult ConsumeResult `json:"consumeResult"`
+ Remark string `json:"remark"`
+ SpentTimeMills int64 `json:"spentTimeMills"`
+}
+
+type ConsumeResult int
+
+const (
+ ConsumeSuccess ConsumeResult = iota
+ ConsumeRetryLater
+ Rollback
+ Commit
+ ThrowException
+ ReturnNull
+)
+
+func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
+ data, err := json.Marshal(result)
+ if err != nil {
+ return nil, err
+ }
+ return data, nil
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 0d8dcd7..57ff0af 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -362,3 +362,45 @@ func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
})
})
}
+
+func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
+ Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() {
+ Convey("test consume success", func() {
+ consumeMessageDirectlyResult := ConsumeMessageDirectlyResult{
+ Order: false,
+ AutoCommit: true,
+ SpentTimeMills: 2,
+ }
+ consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess
+ data, err := consumeMessageDirectlyResult.Encode()
+ So(err, ShouldBeNil)
+ fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+ })
+
+ Convey("test consume timeout", func() {
+ consumeResult := ConsumeMessageDirectlyResult{
+ Order: false,
+ AutoCommit: true,
+ SpentTimeMills: 2,
+ }
+ consumeResult.ConsumeResult = ReturnNull
+ data, err := consumeResult.Encode()
+ So(err, ShouldBeNil)
+ fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+ })
+
+ Convey("test consume exception", func() {
+ consumeResult := ConsumeMessageDirectlyResult{
+ Order: false,
+ AutoCommit: true,
+ SpentTimeMills: 5,
+ }
+ consumeResult.ConsumeResult = ThrowException
+ consumeResult.Remark = "Unknown Exception"
+ data, err := consumeResult.Encode()
+ So(err, ShouldBeNil)
+ fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+ })
+ })
+
+}
diff --git a/internal/request.go b/internal/request.go
index fa88efe..ed3de33 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -407,3 +407,41 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string {
return maps
}
+
+type ConsumeMessageDirectlyHeader struct {
+ consumerGroup string
+ clientID string
+ msgId string
+ brokerName string
+}
+
+func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.consumerGroup
+ maps["clientId"] = request.clientID
+ maps["msgId"] = request.msgId
+ maps["brokerName"] = request.brokerName
+ return maps
+}
+
+func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string) {
+ if len(properties) == 0 {
+ return
+ }
+
+ if v, existed := properties["consumerGroup"]; existed {
+ request.consumerGroup = v
+ }
+
+ if v, existed := properties["clientId"]; existed {
+ request.clientID = v
+ }
+
+ if v, existed := properties["msgId"]; existed {
+ request.msgId = v
+ }
+
+ if v, existed := properties["brokerName"]; existed {
+ request.brokerName = v
+ }
+}