You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/09/01 07:27:57 UTC
[rocketmq-clients] branch master updated: Log error with requestId (#215)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 616040a Log error with requestId (#215)
616040a is described below
commit 616040af196b974c3d55e16c88431ccdcaaf2fc5
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Thu Sep 1 15:27:52 2022 +0800
Log error with requestId (#215)
* log error with requestId
* debug
* debug
* fix unit test
Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
golang/client.go | 6 +++---
golang/pkg/utils/utils.go | 12 ++++++++++++
golang/producer.go | 20 ++++++++++----------
golang/producer_test.go | 2 +-
golang/simple_consumer.go | 8 ++++----
5 files changed, 30 insertions(+), 18 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 48f00f9..ee0dc62 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -336,13 +336,13 @@ func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatReques
}
resp, err := cli.clientManager.HeartBeat(ctx, endpoints, request, cli.settings.GetRequestTimeout())
if err != nil {
- return fmt.Errorf("failed to send heartbeat, endpoints=%v, err=%v", endpoints, err)
+ return fmt.Errorf("failed to send heartbeat, endpoints=%v, err=%v, requestId=%s", endpoints, err, utils.GetRequestID(ctx))
}
if resp.Status.GetCode() != v2.Code_OK {
if resp.Status.GetCode() == v2.Code_UNRECOGNIZED_CLIENT_TYPE {
go cli.trySyncSettings()
}
- cli.log.Errorf("failed to send heartbeat, code=%v, status message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(), endpoints)
+ cli.log.Errorf("failed to send heartbeat, code=%v, status message=[%s], endpoints=%v, requestId=%s", resp.Status.GetCode(), resp.Status.GetMessage(), endpoints, utils.GetRequestID(ctx))
return &ErrRpcStatus{
Code: int32(resp.Status.GetCode()),
Message: resp.GetStatus().GetMessage(),
@@ -352,7 +352,7 @@ func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatReques
switch p := cli.clientImpl.(type) {
case *defaultProducer:
if _, ok := p.isolated.LoadAndDelete(target); ok {
- cli.log.Infof("rejoin endpoints which is isolated before, endpoints=%v\n", endpoints)
+ cli.log.Infof("rejoin endpoints which is isolated before, endpoints=%v", endpoints)
}
default:
// ignore
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 2dfc0a9..d07e60e 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -20,6 +20,7 @@ package utils
import (
"bytes"
"compress/gzip"
+ "context"
"encoding/hex"
"fmt"
"io/ioutil"
@@ -32,9 +33,11 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/rocketmq-clients/golang/metadata"
v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
"github.com/valyala/fastrand"
"go.opencensus.io/trace"
+ MD "google.golang.org/grpc/metadata"
)
func Mod(n int32, m int) int {
@@ -264,3 +267,12 @@ func GetMacAddress() []byte {
}
return nil
}
+
+func GetRequestID(ctx context.Context) string {
+ ret := "nil"
+ md, ok := MD.FromOutgoingContext(ctx)
+ if ok {
+ ret = fmt.Sprintf("%v", md.Get(metadata.RequestID))
+ }
+ return ret
+}
diff --git a/golang/producer.go b/golang/producer.go
index ee7281e..d87af44 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -56,7 +56,7 @@ func (p *defaultProducer) Start() error {
}
err2 := p.GracefulStop()
if err2 != nil {
- return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
+ return fmt.Errorf("startUp err=%w, shutdown err=%v", err, err2)
}
return fmt.Errorf("startUp err=%w", err)
}
@@ -227,14 +227,14 @@ func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v
p.isolated.Store(utils.ParseAddress(address), true)
}
if attempt >= maxAttempts {
- p.cli.log.Warnf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
- topic, messageIds, maxAttempts, attempt, endpoints)
+ p.cli.log.Errorf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+ topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// No need more attempts for transactional message.
if messageType == v2.MessageType_TRANSACTION {
- p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s",
- topic, messageIds, maxAttempts, attempt, endpoints)
+ p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s",
+ topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// Try to do more attempts.
@@ -242,12 +242,12 @@ func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v
// Retry immediately if the request is not throttled.
if tooManyRequests {
waitTime := p.getNextAttemptDelay(nextAttempt)
- p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
- waitTime, topic, messageIds, maxAttempts, attempt, endpoints)
+ p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+ waitTime, topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
time.Sleep(waitTime)
} else {
- p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
- topic, messageIds, maxAttempts, attempt, endpoints)
+ p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+ topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
}
return p.send1(ctx, topic, messageType, candidates, pubMessages, nextAttempt)
}
@@ -440,7 +440,7 @@ func (p *defaultProducer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endp
err := p.endTransaction(context.TODO(), endpoints,
mv.GetMessageCommon(), messageId, transactionId, resolution)
if err != nil {
- p.cli.log.Errorf("exception raised while ending the transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w\n", messageId, transactionId, endpoints, err)
+ p.cli.log.Errorf("exception raised while ending the transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w", messageId, transactionId, endpoints, err)
}
}(messageView)
return nil
diff --git a/golang/producer_test.go b/golang/producer_test.go
index 5418a2d..312fcd7 100644
--- a/golang/producer_test.go
+++ b/golang/producer_test.go
@@ -54,7 +54,7 @@ func TestProducer(t *testing.T) {
Endpoint: endpoints,
Group: MOCK_GROUP,
Credentials: &credentials.SessionCredentials{},
- }, WithTopics(MOCK_TOPIC))
+ })
if err != nil {
t.Error(err)
}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 984b368..81e98b8 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -89,9 +89,9 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi
duration := time.Since(watchTime)
messageHookPointsStatus := MessageHookPointsStatus_OK
if err != nil {
- sc.cli.log.Errorf("exception raised during message acknowledgement, messageId=%s, endpoints=%v", messageView.GetMessageId(), endpoints)
+ sc.cli.log.Errorf("exception raised during message acknowledgement, messageId=%s, endpoints=%v, requestId=%s", messageView.GetMessageId(), endpoints, utils.GetRequestID(ctx))
} else if resp.GetStatus().GetCode() != v2.Code_OK {
- sc.cli.log.Errorf("failed to change message invisible duration, messageId=%s, endpoints=%v, code=%v, status message=[%s]", messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(), resp.GetStatus().GetMessage())
+ sc.cli.log.Errorf("failed to change message invisible duration, messageId=%s, endpoints=%v, code=%v, status message=[%s], requestId=%s", messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(), resp.GetStatus().GetMessage(), utils.GetRequestID(ctx))
err = &ErrRpcStatus{
Code: int32(resp.Status.GetCode()),
Message: resp.GetStatus().GetMessage(),
@@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
break
}
if err != nil {
- sc.cli.log.Errorf("simpleConsumer recv msg err=%w", err)
+ sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
break
}
resps = append(resps, resp)
@@ -364,7 +364,7 @@ func (sc *defaultSimpleConsumer) Start() error {
}
err2 := sc.GracefulStop()
if err2 != nil {
- return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
+ return fmt.Errorf("startUp err=%w, shutdown err=%v", err, err2)
}
return err
}