You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/05/26 05:46:46 UTC
[rocketmq-client-go] branch master updated: 1. support get consumerRunningInfo return goroutine stack
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b815a70 1. support get consumerRunningInfo return goroutine stack
new 12ce1b9 Merge pull request #826 from WJL3333/support_goroutine_stack
b815a70 is described below
commit b815a7035ebd48f163ec9d20b5c7f7c609198a4d
Author: wangjinlong.1048576 <wa...@bytedance.com>
AuthorDate: Fri May 13 18:11:57 2022 +0800
1. support get consumerRunningInfo return goroutine stack
---
consumer/push_consumer.go | 16 +++++++++++++++-
internal/client.go | 8 ++++----
internal/model.go | 7 ++++++-
internal/request.go | 7 +++++++
4 files changed, 32 insertions(+), 6 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 5a9f64a..801f412 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
+ "runtime/pprof"
"strconv"
"strings"
"sync"
@@ -352,7 +353,7 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker
return res
}
-func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
+func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()
pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
@@ -379,6 +380,19 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
return true
})
+ if stack {
+ var buffer strings.Builder
+
+ err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
+ if err != nil {
+ rlog.Error("error when get stack ", map[string]interface{}{
+ "error": err,
+ })
+ } else {
+ info.JStack = buffer.String()
+ }
+ }
+
nsAddr := ""
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
diff --git a/internal/client.go b/internal/client.go
index cc7a3ec..9965c23 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -85,7 +85,7 @@ type InnerConsumer interface {
SubscriptionDataList() []*SubscriptionData
Rebalance()
IsUnitMode() bool
- GetConsumerRunningInfo() *ConsumerRunningInfo
+ GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo
ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
GetcType() string
GetModel() string
@@ -270,7 +270,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
cli, ok := val.(*rmqClient)
var runningInfo *ConsumerRunningInfo
if ok {
- runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
+ runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable)
}
if runningInfo != nil {
res.Code = ResSuccess
@@ -840,12 +840,12 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim
consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
}
-func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
+func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo {
consumer, exist := c.consumerMap.Load(group)
if !exist {
return nil
}
- info := consumer.(InnerConsumer).GetConsumerRunningInfo()
+ info := consumer.(InnerConsumer).GetConsumerRunningInfo(stack)
if info != nil {
info.Properties[PropClientVersion] = clientVersion
}
diff --git a/internal/model.go b/internal/model.go
index 7a011d7..c248bb3 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -149,6 +149,7 @@ type ConsumerRunningInfo struct {
SubscriptionData map[*SubscriptionData]bool
MQTable map[primitive.MessageQueue]ProcessQueueInfo
StatusTable map[string]ConsumeStatus
+ JStack string // just follow java request param name, but pass golang stack here.
}
func (info ConsumerRunningInfo) Encode() ([]byte, error) {
@@ -251,7 +252,11 @@ func (info ConsumerRunningInfo) Encode() ([]byte, error) {
tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV))
}
tableJson = strings.TrimLeft(tableJson, ",")
- jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson))
+
+ jsonData = fmt.Sprintf("%s,\"%s\":%s, \"%s\":\"%s\" }",
+ jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson),
+ "jstack", info.JStack)
+
return []byte(jsonData), nil
}
diff --git a/internal/request.go b/internal/request.go
index 0e3d8e1..2a475e1 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -305,12 +305,14 @@ func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
type GetConsumerRunningInfoHeader struct {
consumerGroup string
clientID string
+ jstackEnable bool
}
func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.consumerGroup
maps["clientId"] = request.clientID
+ maps["jstackEnable"] = strconv.FormatBool(request.jstackEnable)
return maps
}
@@ -325,6 +327,11 @@ func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string
if v, existed := properties["clientId"]; existed {
request.clientID = v
}
+
+ if v, existed := properties["jstackEnable"]; existed {
+ parseBool, _ := strconv.ParseBool(v)
+ request.jstackEnable = parseBool
+ }
}
type QueryMessageRequestHeader struct {