You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ju...@apache.org on 2021/12/05 11:46:10 UTC

[dubbo-go] 01/04: feat: detail dubbo logs

This is an automated email from the ASF dual-hosted git repository.

justxuewei pushed a commit to branch feat-adasvc
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit 4a04d4a7d5613c0f61003db38f38eb221035c9e7
Author: XavierNiu <a...@nxw.name>
AuthorDate: Sun Dec 5 18:46:55 2021 +0800

    feat: detail dubbo logs
---
 cluster/cluster/adaptivesvc/cluster_invoker.go      |  4 ++--
 cluster/loadbalance/p2c/loadbalance.go              | 12 +++++++++++-
 common/constant/key.go                              |  2 +-
 filter/adaptivesvc/filter.go                        |  4 ++--
 filter/adaptivesvc/limiter/hill_climbing.go         |  2 +-
 protocol/dubbo/dubbo_invoker.go                     |  3 ++-
 protocol/dubbo/dubbo_protocol.go                    |  1 +
 protocol/protocolwrapper/protocol_filter_wrapper.go |  6 ++++--
 protocol/result.go                                  |  6 ++++++
 remoting/codec.go                                   |  1 +
 remoting/exchange.go                                |  6 ++++++
 remoting/exchange_client.go                         |  4 ++++
 remoting/getty/listener.go                          | 17 +++++++++++------
 13 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 2a7bc8b..5678778 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -62,10 +62,10 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation
 	result := invoker.Invoke(ctx, invocation)
 
 	// TODO(justxuewei): remove after test
-	logger.Debugf("%#v", result.Result())
+	logger.Debugf("result: Result: %#v", result.Attachments())
 
 	// update metrics
-	remainingStr := invocation.AttachmentsByKey(constant.AdaptiveServiceRemainingKey, "")
+	remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
 	remaining, err := strconv.Atoi(remainingStr)
 	if err != nil {
 		logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
diff --git a/cluster/loadbalance/p2c/loadbalance.go b/cluster/loadbalance/p2c/loadbalance.go
index e9223fd..174032d 100644
--- a/cluster/loadbalance/p2c/loadbalance.go
+++ b/cluster/loadbalance/p2c/loadbalance.go
@@ -73,6 +73,9 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
 			j = rand.Intn(len(invokers))
 		}
 	}
+	logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
+		i, j, invokers[i], invokers[j])
+
 	// TODO(justxuewei): please consider how to get the real method name from $invoke,
 	// 	see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
 	methodName := invocation.MethodName()
@@ -81,6 +84,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
 	remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
 	if err != nil {
 		if errors.Is(err, metrics.ErrMetricsNotFound) {
+			logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
 			return invokers[i]
 		}
 		logger.Warnf("get method metrics err: %v", err)
@@ -90,6 +94,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
 	remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
 	if err != nil {
 		if errors.Is(err, metrics.ErrMetricsNotFound) {
+			logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
 			return invokers[j]
 		}
 		logger.Warnf("get method metrics err: %v", err)
@@ -99,7 +104,8 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
 	// Convert interface to int, if the type is unexpected, panic immediately
 	remainingI, ok := remainingIIface.(uint64)
 	if !ok {
-		panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingIIface))
+		panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T",
+			metrics.HillClimbing, remainingIIface))
 	}
 
 	remainingJ, ok := remainingJIface.(uint64)
@@ -107,10 +113,14 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
 		panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
 	}
 
+	logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)
+
 	// For the remaining capacity, the bigger, the better.
 	if remainingI > remainingJ {
+		logger.Debugf("[P2C select] The invoker[i] was selected.")
 		return invokers[i]
 	}
 
+	logger.Debugf("[P2C select] The invoker[j] was selected.")
 	return invokers[j]
 }
diff --git a/common/constant/key.go b/common/constant/key.go
index 4b467af..cde0561 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -65,7 +65,7 @@ const (
 const (
 	AccessLogFilterKey                   = "accesslog"
 	ActiveFilterKey                      = "active"
-	AdaptiveServiceProviderFilterKey     = "adaptive-service-provider"
+	AdaptiveServiceProviderFilterKey     = "padasvc"
 	AuthConsumerFilterKey                = "sign"
 	AuthProviderFilterKey                = "auth"
 	EchoFilterKey                        = "echo"
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 98934d0..f2d7d1b 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -116,8 +116,8 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result pro
 	}
 
 	// set attachments to inform consumer of provider status
-	invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
-	invocation.SetAttachments(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
+	result.AddAttachment(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
+	result.AddAttachment(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
 	logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: %d.",
 		constant.AdaptiveServiceRemainingKey, l.Remaining(),
 		constant.AdaptiveServiceInflightKey, l.Inflight())
diff --git a/filter/adaptivesvc/limiter/hill_climbing.go b/filter/adaptivesvc/limiter/hill_climbing.go
index 171d89e..10727e6 100644
--- a/filter/adaptivesvc/limiter/hill_climbing.go
+++ b/filter/adaptivesvc/limiter/hill_climbing.go
@@ -298,7 +298,7 @@ func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error
 
 	limitation = math.Max(1.0, math.Min(limitation, float64(maxLimitation)))
 	u.limiter.limitation.Store(uint64(limitation))
-	VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", oldLimitation, uint64(limitation))
+	VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", uint64(oldLimitation), uint64(limitation))
 	return nil
 }
 
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 0f23070..232eeaf 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -158,7 +158,8 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 		result.Rest = inv.Reply()
 		result.Attrs = rest.Attrs
 	}
-	logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
+
+	logger.Debugf("[DubboInvoker.Invoke] received rpc result form server: %s", result)
 
 	return &result
 }
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 58c0601..8b46983 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -167,6 +167,7 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
 			// p.Header.ResponseStatus = hessian.Response_OK
 			// p.Body = hessian.NewResponse(res, nil, result.Attachments())
 		}
+		result.Attrs = invokeResult.Attachments()
 	} else {
 		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
 	}
diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go
index d14378c..da07694 100644
--- a/protocol/protocolwrapper/protocol_filter_wrapper.go
+++ b/protocol/protocolwrapper/protocol_filter_wrapper.go
@@ -88,9 +88,11 @@ func BuildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
 	}
 
 	if key == constant.ServiceFilterKey {
-		logger.Debugf("[BuildInvokerChain] The provider filters are %s, invoker: %s", filterNames, invoker)
+		logger.Debugf("[BuildInvokerChain] The provider invocation link is %s, invoker: %s",
+			strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
 	} else if key == constant.ReferenceFilterKey {
-		logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s", filterNames, invoker)
+		logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s",
+			strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
 	}
 	return next
 }
diff --git a/protocol/result.go b/protocol/result.go
index a36b16d..16d76ec 100644
--- a/protocol/result.go
+++ b/protocol/result.go
@@ -17,6 +17,8 @@
 
 package protocol
 
+import "fmt"
+
 // Result is a RPC result
 type Result interface {
 	// SetError sets error.
@@ -92,3 +94,7 @@ func (r *RPCResult) Attachment(key string, defaultValue interface{}) interface{}
 	}
 	return v
 }
+
+func (r *RPCResult) String() string {
+	return fmt.Sprintf("&RPCResult{Rest: %v, Attrs: %v, Err: %v}", r.Rest, r.Attrs, r.Err)
+}
diff --git a/remoting/codec.go b/remoting/codec.go
index e1d1c8b..8e09490 100644
--- a/remoting/codec.go
+++ b/remoting/codec.go
@@ -29,6 +29,7 @@ type Codec interface {
 }
 
 type DecodeResult struct {
+	// IsRequest indicates whether the current request is a heartbeat request
 	IsRequest bool
 	Result    interface{}
 }
diff --git a/remoting/exchange.go b/remoting/exchange.go
index 1fda038..aa81689 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -18,6 +18,7 @@
 package remoting
 
 import (
+	"fmt"
 	"sync"
 	"time"
 )
@@ -115,6 +116,11 @@ func (response *Response) Handle() {
 	}
 }
 
+func (response *Response) String() string {
+	return fmt.Sprintf("&remoting.Response{ID: %d, Version: %s, SerialID: %d, Status: %d, Event: %v, Error: %v, Result: %v}",
+		response.ID, response.Version, response.SerialID, response.Status, response.Event, response.Error, response.Result)
+}
+
 type Options struct {
 	// connect timeout
 	ConnectTimeout time.Duration
diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go
index 08098f7..23d33e8 100644
--- a/remoting/exchange_client.go
+++ b/remoting/exchange_client.go
@@ -134,6 +134,10 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url *comm
 		result.Rest = resultTmp.Rest
 		result.Attrs = resultTmp.Attrs
 		result.Err = resultTmp.Err
+		logger.Debugf("[ExchangeClient.Request] RPCResult from server: %v", resultTmp)
+	} else {
+		logger.Warnf("[ExchangeClient.Request] The type of result is unexpected, we want *protocol.RPCResult, "+
+			"but we got %T", rsp.response.Result)
 	}
 	return nil
 }
diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go
index 78b0f61..65371f3 100644
--- a/remoting/getty/listener.go
+++ b/remoting/getty/listener.go
@@ -102,14 +102,14 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
 func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
 	result, ok := pkg.(remoting.DecodeResult)
 	if !ok {
-		logger.Errorf("illegal package")
+		logger.Errorf("[RpcClientHandler.OnMessage] getty client gets an unexpected rpc result: %#v", result)
 		return
 	}
 	// get heartbeat request from server
 	if result.IsRequest {
 		req := result.Result.(*remoting.Request)
 		if req.Event {
-			logger.Debugf("get rpc heartbeat request{%#v}", req)
+			logger.Debugf("[RpcClientHandler.OnMessage] getty client gets a heartbeat request: %#v", req)
 			resp := remoting.NewResponse(req.ID, req.Version)
 			resp.Status = hessian.Response_OK
 			resp.Event = req.Event
@@ -118,22 +118,23 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
 			reply(session, resp)
 			return
 		}
-		logger.Errorf("illegal request but not heartbeat. {%#v}", req)
+		logger.Errorf("[RpcClientHandler.OnMessage] unexpected heartbeat request: %#v", req)
 		return
 	}
 	h.timeoutTimes = 0
 	p := result.Result.(*remoting.Response)
 	// get heartbeat
 	if p.Event {
-		logger.Debugf("get rpc heartbeat response{%#v}", p)
+		logger.Debugf("[RpcClientHandler.OnMessage] getty client received a heartbeat response: %s", p)
 		if p.Error != nil {
-			logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
+			logger.Errorf("[RpcClientHandler.OnMessage] a heartbeat response received by the getty client "+
+				"encounters an error: %v", p.Error)
 		}
 		p.Handle()
 		return
 	}
 
-	logger.Debugf("get rpc response{%#v}", p)
+	logger.Debugf("[RpcClientHandler.OnMessage] getty client received a response: %s", p)
 
 	h.conn.updateSession(session)
 
@@ -303,11 +304,15 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
 	attachments[constant.LocalAddr] = session.LocalAddr()
 	attachments[constant.RemoteAddr] = session.RemoteAddr()
 
+	logger.Debugf("[RpcServerHandler.OnMessage] invoc.Attrs: %v, invoc.MethodName: %s",
+		invoc.Attachments(), invoc.MethodName())
+
 	result := h.server.requestHandler(invoc)
 	if !req.TwoWay {
 		return
 	}
 	resp.Result = result
+	logger.Debugf("[RpcServerHandler.OnMessage] result attrs: %v, req: %v", result.Attrs, req)
 	reply(session, resp)
 }