You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/03/27 02:44:04 UTC
[rocketmq-client-go] branch native updated: [ISSUE #462] fix the
trace message was send failed. (#463)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 9a58797 [ISSUE #462] fix the trace message was send failed. (#463)
9a58797 is described below
commit 9a587973e438ddb04edda6766fe494016c670064
Author: dinglei <li...@163.com>
AuthorDate: Fri Mar 27 10:43:53 2020 +0800
[ISSUE #462] fix the trace message was send failed. (#463)
* fix(trace): fix the trace message was send failed.
* fix(bug): remove namespace in the trace message body
* fix the processid error in client id.
---
consumer/push_consumer.go | 5 ++++
internal/client.go | 20 +++++++++++++-
internal/trace.go | 68 +++++++++++++++++++++++++++++++++++++----------
primitive/message.go | 13 ++++++---
primitive/trace.go | 1 +
5 files changed, 89 insertions(+), 18 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 321c59f..9118ae2 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,6 +831,11 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
msgCtx, _ := primitive.GetConsumerCtx(ctx)
msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
+ if realReply.ConsumeResult == ConsumeSuccess {
+ msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
+ } else {
+ msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
+ }
return e
})
return container.ConsumeResult, err
diff --git a/internal/client.go b/internal/client.go
index 90c9b0c..6110dba 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -393,7 +393,12 @@ func (c *rmqClient) Shutdown() {
}
func (c *rmqClient) ClientID() string {
- id := c.option.ClientIP + "@" + c.option.InstanceName
+ id := c.option.ClientIP + "@"
+ if c.option.InstanceName == "DEFAULT" {
+ id += strconv.Itoa(os.Getpid())
+ } else {
+ id += c.option.InstanceName
+ }
if c.option.UnitName != "" {
id += "@" + c.option.UnitName
}
@@ -466,6 +471,19 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
+ rlog.Debug("try to send heart beat to broker", map[string]interface{}{
+ "brokerName": brokerName,
+ "brokerId": id,
+ "brokerAddr": addr,
+ })
+ if hbData.ConsumerDatas.Len() == 0 && id != 0 {
+ rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
+ "brokerName": brokerName,
+ "brokerId": id,
+ "brokerAddr": addr,
+ })
+ continue
+ }
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
diff --git a/internal/trace.go b/internal/trace.go
index 473b80f..48b257f 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -89,9 +89,21 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RegionId)
buffer.WriteRune(contentSplitter)
- buffer.WriteString(ctx.GroupName)
+ ss := strings.Split(ctx.GroupName, "%")
+ if len(ss) == 2 {
+ buffer.WriteString(ss[1])
+ } else {
+ buffer.WriteString(ctx.GroupName)
+ }
+
buffer.WriteRune(contentSplitter)
- buffer.WriteString(bean.Topic)
+ ssTopic := strings.Split(bean.Topic, "%")
+ if len(ssTopic) == 2 {
+ buffer.WriteString(ssTopic[1])
+ } else {
+ buffer.WriteString(bean.Topic)
+ }
+ //buffer.WriteString(bean.Topic)
buffer.WriteRune(contentSplitter)
buffer.WriteString(bean.MsgId)
buffer.WriteRune(contentSplitter)
@@ -119,7 +131,12 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RegionId)
buffer.WriteRune(contentSplitter)
- buffer.WriteString(ctx.GroupName)
+ ss := strings.Split(ctx.GroupName, "%")
+ if len(ss) == 2 {
+ buffer.WriteString(ss[1])
+ } else {
+ buffer.WriteString(ctx.GroupName)
+ }
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RequestId)
buffer.WriteRune(contentSplitter)
@@ -233,6 +250,9 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
}
cliOp := DefaultClientOptions()
+ cliOp.GroupName = traceCfg.GroupName
+ cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
+ cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
cliOp.RetryTimes = 0
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
@@ -301,8 +321,9 @@ func (td *traceDispatcher) process() {
batch = append(batch, ctx)
if count == batchSize {
count = 0
+ batchSend := batch
go primitive.WithRecover(func() {
- td.batchCommit(batch)
+ td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
@@ -312,15 +333,17 @@ func (td *traceDispatcher) process() {
count++
lastput = time.Now()
if len(batch) > 0 {
+ batchSend := batch
go primitive.WithRecover(func() {
- td.batchCommit(batch)
+ td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
}
case <-td.ctx.Done():
+ batchSend := batch
go primitive.WithRecover(func() {
- td.batchCommit(batch)
+ td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
@@ -403,10 +426,14 @@ func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBea
}
func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
- msg := primitive.NewMessage(td.traceTopic, []byte(data))
+ traceTopic := td.traceTopic
+ if td.access == primitive.Cloud {
+ traceTopic = td.traceTopic + regionID
+ }
+ msg := primitive.NewMessage(traceTopic, []byte(data))
msg.WithKeys(keySet.slice())
- mq, addr := td.findMq()
+ mq, addr := td.findMq(regionID)
if mq == nil {
return
}
@@ -414,19 +441,32 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
var req = td.buildSendRequest(mq, msg)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
+ resp := new(primitive.SendResult)
if e != nil {
- rlog.Error("send trace data error", map[string]interface{}{
+ rlog.Info("send trace data error.", map[string]interface{}{
"traceData": data,
})
+ } else {
+ td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
+ rlog.Debug("send trace data success:", map[string]interface{}{
+ "SendResult": resp,
+ "traceData": data,
+ })
}
})
- rlog.Error("send trace data error when invoke", map[string]interface{}{
- rlog.LogKeyUnderlayError: err,
- })
+ if err != nil {
+ rlog.Info("send trace data error when invoke", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
}
-func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
- mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
+func (td *traceDispatcher) findMq(regionID string) (*primitive.MessageQueue, string) {
+ traceTopic := td.traceTopic
+ if td.access == primitive.Cloud {
+ traceTopic = td.traceTopic + regionID
+ }
+ mqs, err := td.namesrvs.FetchPublishMessageQueues(traceTopic)
if err != nil {
rlog.Error("fetch publish message queues failed", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
diff --git a/primitive/message.go b/primitive/message.go
index 7d12edb..6a84477 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -237,6 +237,7 @@ func (m *Message) Marshal() []byte {
type MessageExt struct {
Message
MsgId string
+ OffsetMsgId string
StoreSize int32
QueueOffset int64
SysFlag int32
@@ -263,9 +264,9 @@ func (msgExt *MessageExt) IsTraceOn() string {
}
func (msgExt *MessageExt) String() string {
- return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
+ return fmt.Sprintf("[Message=%s, MsgId=%s, OffsetMsgId=%s,QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
- "ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.Queue.QueueId,
+ "ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.OffsetMsgId, msgExt.Queue.QueueId,
msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
msgExt.PreparedTransactionOffset)
@@ -364,11 +365,17 @@ func DecodeMessage(data []byte) []*MessageExt {
}
count += 2 + int(propertiesLength)
- msg.MsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
+ msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
//count += 16
if msg.properties == nil {
msg.properties = make(map[string]string, 0)
}
+ msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
+ if len(msgID) == 0 {
+ msg.MsgId = msg.OffsetMsgId
+ } else {
+ msg.MsgId = msgID
+ }
msgs = append(msgs, msg)
}
diff --git a/primitive/trace.go b/primitive/trace.go
index c898623..c0df5b3 100644
--- a/primitive/trace.go
+++ b/primitive/trace.go
@@ -20,6 +20,7 @@ package primitive
// config for message trace.
type TraceConfig struct {
TraceTopic string
+ GroupName string
Access AccessChannel
NamesrvAddrs []string
Credentials // acl config for trace. omit if acl is closed on broker.