You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/05/26 05:46:46 UTC

[rocketmq-client-go] branch master updated: 1. support get consumerRunningInfo return goroutine stack

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b815a70  1. support get consumerRunningInfo return goroutine stack
     new 12ce1b9  Merge pull request #826 from WJL3333/support_goroutine_stack
b815a70 is described below

commit b815a7035ebd48f163ec9d20b5c7f7c609198a4d
Author: wangjinlong.1048576 <wa...@bytedance.com>
AuthorDate: Fri May 13 18:11:57 2022 +0800

    1. support get consumerRunningInfo return goroutine stack
---
 consumer/push_consumer.go | 16 +++++++++++++++-
 internal/client.go        |  8 ++++----
 internal/model.go         |  7 ++++++-
 internal/request.go       |  7 +++++++
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 5a9f64a..801f412 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
 	"math"
+	"runtime/pprof"
 	"strconv"
 	"strings"
 	"sync"
@@ -352,7 +353,7 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker
 	return res
 }
 
-func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
+func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
 	info := internal.NewConsumerRunningInfo()
 
 	pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
@@ -379,6 +380,19 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
 		return true
 	})
 
+	if stack {
+		var buffer strings.Builder
+
+		err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
+		if err != nil {
+			rlog.Error("error when get stack ", map[string]interface{}{
+				"error": err,
+			})
+		} else {
+			info.JStack = buffer.String()
+		}
+	}
+
 	nsAddr := ""
 	for _, value := range pc.client.GetNameSrv().AddrList() {
 		nsAddr += fmt.Sprintf("%s;", value)
diff --git a/internal/client.go b/internal/client.go
index cc7a3ec..9965c23 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -85,7 +85,7 @@ type InnerConsumer interface {
 	SubscriptionDataList() []*SubscriptionData
 	Rebalance()
 	IsUnitMode() bool
-	GetConsumerRunningInfo() *ConsumerRunningInfo
+	GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo
 	ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
 	GetcType() string
 	GetModel() string
@@ -270,7 +270,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 				cli, ok := val.(*rmqClient)
 				var runningInfo *ConsumerRunningInfo
 				if ok {
-					runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
+					runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable)
 				}
 				if runningInfo != nil {
 					res.Code = ResSuccess
@@ -840,12 +840,12 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim
 	consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
 }
 
-func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
+func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo {
 	consumer, exist := c.consumerMap.Load(group)
 	if !exist {
 		return nil
 	}
-	info := consumer.(InnerConsumer).GetConsumerRunningInfo()
+	info := consumer.(InnerConsumer).GetConsumerRunningInfo(stack)
 	if info != nil {
 		info.Properties[PropClientVersion] = clientVersion
 	}
diff --git a/internal/model.go b/internal/model.go
index 7a011d7..c248bb3 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -149,6 +149,7 @@ type ConsumerRunningInfo struct {
 	SubscriptionData map[*SubscriptionData]bool
 	MQTable          map[primitive.MessageQueue]ProcessQueueInfo
 	StatusTable      map[string]ConsumeStatus
+	JStack           string // just follow java request param name, but pass golang stack here.
 }
 
 func (info ConsumerRunningInfo) Encode() ([]byte, error) {
@@ -251,7 +252,11 @@ func (info ConsumerRunningInfo) Encode() ([]byte, error) {
 		tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV))
 	}
 	tableJson = strings.TrimLeft(tableJson, ",")
-	jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson))
+
+	jsonData = fmt.Sprintf("%s,\"%s\":%s, \"%s\":\"%s\" }",
+		jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson),
+		"jstack", info.JStack)
+
 	return []byte(jsonData), nil
 }
 
diff --git a/internal/request.go b/internal/request.go
index 0e3d8e1..2a475e1 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -305,12 +305,14 @@ func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
 type GetConsumerRunningInfoHeader struct {
 	consumerGroup string
 	clientID      string
+	jstackEnable  bool
 }
 
 func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
 	maps := make(map[string]string)
 	maps["consumerGroup"] = request.consumerGroup
 	maps["clientId"] = request.clientID
+	maps["jstackEnable"] = strconv.FormatBool(request.jstackEnable)
 	return maps
 }
 
@@ -325,6 +327,11 @@ func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string
 	if v, existed := properties["clientId"]; existed {
 		request.clientID = v
 	}
+
+	if v, existed := properties["jstackEnable"]; existed {
+		parseBool, _ := strconv.ParseBool(v)
+		request.jstackEnable = parseBool
+	}
 }
 
 type QueryMessageRequestHeader struct {