You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/09/01 07:27:57 UTC

[rocketmq-clients] branch master updated: Log error with requestId (#215)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 616040a  Log error with requestId (#215)
616040a is described below

commit 616040af196b974c3d55e16c88431ccdcaaf2fc5
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Thu Sep 1 15:27:52 2022 +0800

    Log error with requestId (#215)
    
    * log error with requestId
    
    * debug
    
    * debug
    
    * fix unit test
    
    Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
 golang/client.go          |  6 +++---
 golang/pkg/utils/utils.go | 12 ++++++++++++
 golang/producer.go        | 20 ++++++++++----------
 golang/producer_test.go   |  2 +-
 golang/simple_consumer.go |  8 ++++----
 5 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 48f00f9..ee0dc62 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -336,13 +336,13 @@ func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatReques
 	}
 	resp, err := cli.clientManager.HeartBeat(ctx, endpoints, request, cli.settings.GetRequestTimeout())
 	if err != nil {
-		return fmt.Errorf("failed to send heartbeat, endpoints=%v, err=%v", endpoints, err)
+		return fmt.Errorf("failed to send heartbeat, endpoints=%v, err=%v, requestId=%s", endpoints, err, utils.GetRequestID(ctx))
 	}
 	if resp.Status.GetCode() != v2.Code_OK {
 		if resp.Status.GetCode() == v2.Code_UNRECOGNIZED_CLIENT_TYPE {
 			go cli.trySyncSettings()
 		}
-		cli.log.Errorf("failed to send heartbeat, code=%v, status message=[%s], endpoints=%v", resp.Status.GetCode(), resp.Status.GetMessage(), endpoints)
+		cli.log.Errorf("failed to send heartbeat, code=%v, status message=[%s], endpoints=%v, requestId=%s", resp.Status.GetCode(), resp.Status.GetMessage(), endpoints, utils.GetRequestID(ctx))
 		return &ErrRpcStatus{
 			Code:    int32(resp.Status.GetCode()),
 			Message: resp.GetStatus().GetMessage(),
@@ -352,7 +352,7 @@ func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatReques
 	switch p := cli.clientImpl.(type) {
 	case *defaultProducer:
 		if _, ok := p.isolated.LoadAndDelete(target); ok {
-			cli.log.Infof("rejoin endpoints which is isolated before, endpoints=%v\n", endpoints)
+			cli.log.Infof("rejoin endpoints which is isolated before, endpoints=%v", endpoints)
 		}
 	default:
 		// ignore
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 2dfc0a9..d07e60e 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -20,6 +20,7 @@ package utils
 import (
 	"bytes"
 	"compress/gzip"
+	"context"
 	"encoding/hex"
 	"fmt"
 	"io/ioutil"
@@ -32,9 +33,11 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/rocketmq-clients/golang/metadata"
 	v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
 	"github.com/valyala/fastrand"
 	"go.opencensus.io/trace"
+	MD "google.golang.org/grpc/metadata"
 )
 
 func Mod(n int32, m int) int {
@@ -264,3 +267,12 @@ func GetMacAddress() []byte {
 	}
 	return nil
 }
+
+func GetRequestID(ctx context.Context) string {
+	ret := "nil"
+	md, ok := MD.FromOutgoingContext(ctx)
+	if ok {
+		ret = fmt.Sprintf("%v", md.Get(metadata.RequestID))
+	}
+	return ret
+}
diff --git a/golang/producer.go b/golang/producer.go
index ee7281e..d87af44 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -56,7 +56,7 @@ func (p *defaultProducer) Start() error {
 	}
 	err2 := p.GracefulStop()
 	if err2 != nil {
-		return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
+		return fmt.Errorf("startUp err=%w, shutdown err=%v", err, err2)
 	}
 	return fmt.Errorf("startUp err=%w", err)
 }
@@ -227,14 +227,14 @@ func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v
 			p.isolated.Store(utils.ParseAddress(address), true)
 		}
 		if attempt >= maxAttempts {
-			p.cli.log.Warnf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
-				topic, messageIds, maxAttempts, attempt, endpoints)
+			p.cli.log.Errorf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
 			return nil, err
 		}
 		// No need more attempts for transactional message.
 		if messageType == v2.MessageType_TRANSACTION {
-			p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v,  maxAttempts=%d, attempt=%d, endpoints=%s",
-				topic, messageIds, maxAttempts, attempt, endpoints)
+			p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v,  maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s",
+				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
 			return nil, err
 		}
 		// Try to do more attempts.
@@ -242,12 +242,12 @@ func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v
 		// Retry immediately if the request is not throttled.
 		if tooManyRequests {
 			waitTime := p.getNextAttemptDelay(nextAttempt)
-			p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
-				waitTime, topic, messageIds, maxAttempts, attempt, endpoints)
+			p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+				waitTime, topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
 			time.Sleep(waitTime)
 		} else {
-			p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v",
-				topic, messageIds, maxAttempts, attempt, endpoints)
+			p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
+				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
 		}
 		return p.send1(ctx, topic, messageType, candidates, pubMessages, nextAttempt)
 	}
@@ -440,7 +440,7 @@ func (p *defaultProducer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endp
 		err := p.endTransaction(context.TODO(), endpoints,
 			mv.GetMessageCommon(), messageId, transactionId, resolution)
 		if err != nil {
-			p.cli.log.Errorf("exception raised while ending the transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w\n", messageId, transactionId, endpoints, err)
+			p.cli.log.Errorf("exception raised while ending the transaction, messageId=%s, transactionId=%s, endpoints=%v, err=%w", messageId, transactionId, endpoints, err)
 		}
 	}(messageView)
 	return nil
diff --git a/golang/producer_test.go b/golang/producer_test.go
index 5418a2d..312fcd7 100644
--- a/golang/producer_test.go
+++ b/golang/producer_test.go
@@ -54,7 +54,7 @@ func TestProducer(t *testing.T) {
 		Endpoint:    endpoints,
 		Group:       MOCK_GROUP,
 		Credentials: &credentials.SessionCredentials{},
-	}, WithTopics(MOCK_TOPIC))
+	})
 	if err != nil {
 		t.Error(err)
 	}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 984b368..81e98b8 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -89,9 +89,9 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi
 	duration := time.Since(watchTime)
 	messageHookPointsStatus := MessageHookPointsStatus_OK
 	if err != nil {
-		sc.cli.log.Errorf("exception raised during message acknowledgement, messageId=%s, endpoints=%v", messageView.GetMessageId(), endpoints)
+		sc.cli.log.Errorf("exception raised during message acknowledgement, messageId=%s, endpoints=%v, requestId=%s", messageView.GetMessageId(), endpoints, utils.GetRequestID(ctx))
 	} else if resp.GetStatus().GetCode() != v2.Code_OK {
-		sc.cli.log.Errorf("failed to change message invisible duration, messageId=%s, endpoints=%v, code=%v, status message=[%s]", messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(), resp.GetStatus().GetMessage())
+		sc.cli.log.Errorf("failed to change message invisible duration, messageId=%s, endpoints=%v, code=%v, status message=[%s], requestId=%s", messageView.GetMessageId(), endpoints, resp.GetStatus().GetCode(), resp.GetStatus().GetMessage(), utils.GetRequestID(ctx))
 		err = &ErrRpcStatus{
 			Code:    int32(resp.Status.GetCode()),
 			Message: resp.GetStatus().GetMessage(),
@@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request *v2
 				break
 			}
 			if err != nil {
-				sc.cli.log.Errorf("simpleConsumer recv msg err=%w", err)
+				sc.cli.log.Errorf("simpleConsumer recv msg err=%w, requestId=%s", err, utils.GetRequestID(ctx))
 				break
 			}
 			resps = append(resps, resp)
@@ -364,7 +364,7 @@ func (sc *defaultSimpleConsumer) Start() error {
 	}
 	err2 := sc.GracefulStop()
 	if err2 != nil {
-		return fmt.Errorf("startUp err=%w, shutdown err=%w", err, err2)
+		return fmt.Errorf("startUp err=%w, shutdown err=%v", err, err2)
 	}
 	return err
 }