You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/03/27 02:44:04 UTC

[rocketmq-client-go] branch native updated: [ISSUE #462] fix the trace message was send failed. (#463)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 9a58797  [ISSUE #462] fix the trace message was send failed. (#463)
9a58797 is described below

commit 9a587973e438ddb04edda6766fe494016c670064
Author: dinglei <li...@163.com>
AuthorDate: Fri Mar 27 10:43:53 2020 +0800

    [ISSUE #462] fix the trace message was send failed. (#463)
    
    * fix(trace): fix the trace message was send failed.
    
    * fix(bug): remove namespace in the trace message body
    
    * fix the processid error in client id.
---
 consumer/push_consumer.go |  5 ++++
 internal/client.go        | 20 +++++++++++++-
 internal/trace.go         | 68 +++++++++++++++++++++++++++++++++++++----------
 primitive/message.go      | 13 ++++++---
 primitive/trace.go        |  1 +
 5 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 321c59f..9118ae2 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,6 +831,11 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
 
 			msgCtx, _ := primitive.GetConsumerCtx(ctx)
 			msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
+			if realReply.ConsumeResult == ConsumeSuccess {
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
+			} else {
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
+			}
 			return e
 		})
 		return container.ConsumeResult, err
diff --git a/internal/client.go b/internal/client.go
index 90c9b0c..6110dba 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -393,7 +393,12 @@ func (c *rmqClient) Shutdown() {
 }
 
 func (c *rmqClient) ClientID() string {
-	id := c.option.ClientIP + "@" + c.option.InstanceName
+	id := c.option.ClientIP + "@"
+	if c.option.InstanceName == "DEFAULT" {
+		id += strconv.Itoa(os.Getpid())
+	} else {
+		id += c.option.InstanceName
+	}
 	if c.option.UnitName != "" {
 		id += "@" + c.option.UnitName
 	}
@@ -466,6 +471,19 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 		brokerName := key.(string)
 		data := value.(*BrokerData)
 		for id, addr := range data.BrokerAddresses {
+			rlog.Debug("try to send heart beat to broker", map[string]interface{}{
+				"brokerName": brokerName,
+				"brokerId":   id,
+				"brokerAddr": addr,
+			})
+			if hbData.ConsumerDatas.Len() == 0 && id != 0 {
+				rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
+					"brokerName": brokerName,
+					"brokerId":   id,
+					"brokerAddr": addr,
+				})
+				continue
+			}
 			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
 
 			ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
diff --git a/internal/trace.go b/internal/trace.go
index 473b80f..48b257f 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -89,9 +89,21 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
 		buffer.WriteRune(contentSplitter)
 		buffer.WriteString(ctx.RegionId)
 		buffer.WriteRune(contentSplitter)
-		buffer.WriteString(ctx.GroupName)
+		ss := strings.Split(ctx.GroupName, "%")
+		if len(ss) == 2 {
+			buffer.WriteString(ss[1])
+		} else {
+			buffer.WriteString(ctx.GroupName)
+		}
+
 		buffer.WriteRune(contentSplitter)
-		buffer.WriteString(bean.Topic)
+		ssTopic := strings.Split(bean.Topic, "%")
+		if len(ssTopic) == 2 {
+			buffer.WriteString(ssTopic[1])
+		} else {
+			buffer.WriteString(bean.Topic)
+		}
+		//buffer.WriteString(bean.Topic)
 		buffer.WriteRune(contentSplitter)
 		buffer.WriteString(bean.MsgId)
 		buffer.WriteRune(contentSplitter)
@@ -119,7 +131,12 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
 			buffer.WriteRune(contentSplitter)
 			buffer.WriteString(ctx.RegionId)
 			buffer.WriteRune(contentSplitter)
-			buffer.WriteString(ctx.GroupName)
+			ss := strings.Split(ctx.GroupName, "%")
+			if len(ss) == 2 {
+				buffer.WriteString(ss[1])
+			} else {
+				buffer.WriteString(ctx.GroupName)
+			}
 			buffer.WriteRune(contentSplitter)
 			buffer.WriteString(ctx.RequestId)
 			buffer.WriteRune(contentSplitter)
@@ -233,6 +250,9 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
 	}
 
 	cliOp := DefaultClientOptions()
+	cliOp.GroupName = traceCfg.GroupName
+	cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
+	cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
 	cliOp.RetryTimes = 0
 	cliOp.Namesrv = srvs
 	cliOp.Credentials = traceCfg.Credentials
@@ -301,8 +321,9 @@ func (td *traceDispatcher) process() {
 			batch = append(batch, ctx)
 			if count == batchSize {
 				count = 0
+				batchSend := batch
 				go primitive.WithRecover(func() {
-					td.batchCommit(batch)
+					td.batchCommit(batchSend)
 				})
 				batch = make([]TraceContext, 0)
 			}
@@ -312,15 +333,17 @@ func (td *traceDispatcher) process() {
 				count++
 				lastput = time.Now()
 				if len(batch) > 0 {
+					batchSend := batch
 					go primitive.WithRecover(func() {
-						td.batchCommit(batch)
+						td.batchCommit(batchSend)
 					})
 					batch = make([]TraceContext, 0)
 				}
 			}
 		case <-td.ctx.Done():
+			batchSend := batch
 			go primitive.WithRecover(func() {
-				td.batchCommit(batch)
+				td.batchCommit(batchSend)
 			})
 			batch = make([]TraceContext, 0)
 
@@ -403,10 +426,14 @@ func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBea
 }
 
 func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
-	msg := primitive.NewMessage(td.traceTopic, []byte(data))
+	traceTopic := td.traceTopic
+	if td.access == primitive.Cloud {
+		traceTopic = td.traceTopic + regionID
+	}
+	msg := primitive.NewMessage(traceTopic, []byte(data))
 	msg.WithKeys(keySet.slice())
 
-	mq, addr := td.findMq()
+	mq, addr := td.findMq(regionID)
 	if mq == nil {
 		return
 	}
@@ -414,19 +441,32 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
 	var req = td.buildSendRequest(mq, msg)
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
+		resp := new(primitive.SendResult)
 		if e != nil {
-			rlog.Error("send trace data error", map[string]interface{}{
+			rlog.Info("send trace data error.", map[string]interface{}{
 				"traceData": data,
 			})
+		} else {
+			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
+			rlog.Debug("send trace data success:", map[string]interface{}{
+				"SendResult": resp,
+				"traceData":  data,
+			})
 		}
 	})
-	rlog.Error("send trace data error when invoke", map[string]interface{}{
-		rlog.LogKeyUnderlayError: err,
-	})
+	if err != nil {
+		rlog.Info("send trace data error when invoke", map[string]interface{}{
+			rlog.LogKeyUnderlayError: err,
+		})
+	}
 }
 
-func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
-	mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
+func (td *traceDispatcher) findMq(regionID string) (*primitive.MessageQueue, string) {
+	traceTopic := td.traceTopic
+	if td.access == primitive.Cloud {
+		traceTopic = td.traceTopic + regionID
+	}
+	mqs, err := td.namesrvs.FetchPublishMessageQueues(traceTopic)
 	if err != nil {
 		rlog.Error("fetch publish message queues failed", map[string]interface{}{
 			rlog.LogKeyUnderlayError: err,
diff --git a/primitive/message.go b/primitive/message.go
index 7d12edb..6a84477 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -237,6 +237,7 @@ func (m *Message) Marshal() []byte {
 type MessageExt struct {
 	Message
 	MsgId                     string
+	OffsetMsgId               string
 	StoreSize                 int32
 	QueueOffset               int64
 	SysFlag                   int32
@@ -263,9 +264,9 @@ func (msgExt *MessageExt) IsTraceOn() string {
 }
 
 func (msgExt *MessageExt) String() string {
-	return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
+	return fmt.Sprintf("[Message=%s, MsgId=%s, OffsetMsgId=%s,QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
 		"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
-		"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.Queue.QueueId,
+		"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.OffsetMsgId, msgExt.Queue.QueueId,
 		msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
 		msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
 		msgExt.PreparedTransactionOffset)
@@ -364,11 +365,17 @@ func DecodeMessage(data []byte) []*MessageExt {
 		}
 		count += 2 + int(propertiesLength)
 
-		msg.MsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
+		msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
 		//count += 16
 		if msg.properties == nil {
 			msg.properties = make(map[string]string, 0)
 		}
+		msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
+		if len(msgID) == 0 {
+			msg.MsgId = msg.OffsetMsgId
+		} else {
+			msg.MsgId = msgID
+		}
 		msgs = append(msgs, msg)
 	}
 
diff --git a/primitive/trace.go b/primitive/trace.go
index c898623..c0df5b3 100644
--- a/primitive/trace.go
+++ b/primitive/trace.go
@@ -20,6 +20,7 @@ package primitive
 // config for message trace.
 type TraceConfig struct {
 	TraceTopic   string
+	GroupName    string
 	Access       AccessChannel
 	NamesrvAddrs []string
 	Credentials  // acl config for trace. omit if acl is closed on broker.