You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ju...@apache.org on 2021/12/05 11:46:10 UTC
[dubbo-go] 01/04: feat: detail dubbo logs
This is an automated email from the ASF dual-hosted git repository.
justxuewei pushed a commit to branch feat-adasvc
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 4a04d4a7d5613c0f61003db38f38eb221035c9e7
Author: XavierNiu <a...@nxw.name>
AuthorDate: Sun Dec 5 18:46:55 2021 +0800
feat: detail dubbo logs
---
cluster/cluster/adaptivesvc/cluster_invoker.go | 4 ++--
cluster/loadbalance/p2c/loadbalance.go | 12 +++++++++++-
common/constant/key.go | 2 +-
filter/adaptivesvc/filter.go | 4 ++--
filter/adaptivesvc/limiter/hill_climbing.go | 2 +-
protocol/dubbo/dubbo_invoker.go | 3 ++-
protocol/dubbo/dubbo_protocol.go | 1 +
protocol/protocolwrapper/protocol_filter_wrapper.go | 6 ++++--
protocol/result.go | 6 ++++++
remoting/codec.go | 1 +
remoting/exchange.go | 6 ++++++
remoting/exchange_client.go | 4 ++++
remoting/getty/listener.go | 17 +++++++++++------
13 files changed, 52 insertions(+), 16 deletions(-)
diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 2a7bc8b..5678778 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -62,10 +62,10 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation
result := invoker.Invoke(ctx, invocation)
// TODO(justxuewei): remove after test
- logger.Debugf("%#v", result.Result())
+ logger.Debugf("result: Result: %#v", result.Attachments())
// update metrics
- remainingStr := invocation.AttachmentsByKey(constant.AdaptiveServiceRemainingKey, "")
+ remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
diff --git a/cluster/loadbalance/p2c/loadbalance.go b/cluster/loadbalance/p2c/loadbalance.go
index e9223fd..174032d 100644
--- a/cluster/loadbalance/p2c/loadbalance.go
+++ b/cluster/loadbalance/p2c/loadbalance.go
@@ -73,6 +73,9 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
j = rand.Intn(len(invokers))
}
}
+ logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
+ i, j, invokers[i], invokers[j])
+
// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
@@ -81,6 +84,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
+ logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
return invokers[i]
}
logger.Warnf("get method metrics err: %v", err)
@@ -90,6 +94,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
+ logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
return invokers[j]
}
logger.Warnf("get method metrics err: %v", err)
@@ -99,7 +104,8 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
// Convert interface to int, if the type is unexpected, panic immediately
remainingI, ok := remainingIIface.(uint64)
if !ok {
- panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingIIface))
+ panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T",
+ metrics.HillClimbing, remainingIIface))
}
remainingJ, ok := remainingJIface.(uint64)
@@ -107,10 +113,14 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}
+ logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)
+
// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
+ logger.Debugf("[P2C select] The invoker[i] was selected.")
return invokers[i]
}
+ logger.Debugf("[P2C select] The invoker[j] was selected.")
return invokers[j]
}
diff --git a/common/constant/key.go b/common/constant/key.go
index 4b467af..cde0561 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -65,7 +65,7 @@ const (
const (
AccessLogFilterKey = "accesslog"
ActiveFilterKey = "active"
- AdaptiveServiceProviderFilterKey = "adaptive-service-provider"
+ AdaptiveServiceProviderFilterKey = "padasvc"
AuthConsumerFilterKey = "sign"
AuthProviderFilterKey = "auth"
EchoFilterKey = "echo"
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 98934d0..f2d7d1b 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -116,8 +116,8 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result pro
}
// set attachments to inform consumer of provider status
- invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
- invocation.SetAttachments(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
+ result.AddAttachment(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
+ result.AddAttachment(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: %d.",
constant.AdaptiveServiceRemainingKey, l.Remaining(),
constant.AdaptiveServiceInflightKey, l.Inflight())
diff --git a/filter/adaptivesvc/limiter/hill_climbing.go b/filter/adaptivesvc/limiter/hill_climbing.go
index 171d89e..10727e6 100644
--- a/filter/adaptivesvc/limiter/hill_climbing.go
+++ b/filter/adaptivesvc/limiter/hill_climbing.go
@@ -298,7 +298,7 @@ func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error
limitation = math.Max(1.0, math.Min(limitation, float64(maxLimitation)))
u.limiter.limitation.Store(uint64(limitation))
- VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", oldLimitation, uint64(limitation))
+ VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", uint64(oldLimitation), uint64(limitation))
return nil
}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 0f23070..232eeaf 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -158,7 +158,8 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
result.Rest = inv.Reply()
result.Attrs = rest.Attrs
}
- logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
+
+ logger.Debugf("[DubboInvoker.Invoke] received rpc result form server: %s", result)
return &result
}
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 58c0601..8b46983 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -167,6 +167,7 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
// p.Header.ResponseStatus = hessian.Response_OK
// p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
+ result.Attrs = invokeResult.Attachments()
} else {
result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
}
diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go
index d14378c..da07694 100644
--- a/protocol/protocolwrapper/protocol_filter_wrapper.go
+++ b/protocol/protocolwrapper/protocol_filter_wrapper.go
@@ -88,9 +88,11 @@ func BuildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
}
if key == constant.ServiceFilterKey {
- logger.Debugf("[BuildInvokerChain] The provider filters are %s, invoker: %s", filterNames, invoker)
+ logger.Debugf("[BuildInvokerChain] The provider invocation link is %s, invoker: %s",
+ strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
} else if key == constant.ReferenceFilterKey {
- logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s", filterNames, invoker)
+ logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s",
+ strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
}
return next
}
diff --git a/protocol/result.go b/protocol/result.go
index a36b16d..16d76ec 100644
--- a/protocol/result.go
+++ b/protocol/result.go
@@ -17,6 +17,8 @@
package protocol
+import "fmt"
+
// Result is a RPC result
type Result interface {
// SetError sets error.
@@ -92,3 +94,7 @@ func (r *RPCResult) Attachment(key string, defaultValue interface{}) interface{}
}
return v
}
+
+func (r *RPCResult) String() string {
+ return fmt.Sprintf("&RPCResult{Rest: %v, Attrs: %v, Err: %v}", r.Rest, r.Attrs, r.Err)
+}
diff --git a/remoting/codec.go b/remoting/codec.go
index e1d1c8b..8e09490 100644
--- a/remoting/codec.go
+++ b/remoting/codec.go
@@ -29,6 +29,7 @@ type Codec interface {
}
type DecodeResult struct {
+ // IsRequest indicates whether the current request is a heartbeat request
IsRequest bool
Result interface{}
}
diff --git a/remoting/exchange.go b/remoting/exchange.go
index 1fda038..aa81689 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -18,6 +18,7 @@
package remoting
import (
+ "fmt"
"sync"
"time"
)
@@ -115,6 +116,11 @@ func (response *Response) Handle() {
}
}
+func (response *Response) String() string {
+ return fmt.Sprintf("&remoting.Response{ID: %d, Version: %s, SerialID: %d, Status: %d, Event: %v, Error: %v, Result: %v}",
+ response.ID, response.Version, response.SerialID, response.Status, response.Event, response.Error, response.Result)
+}
+
type Options struct {
// connect timeout
ConnectTimeout time.Duration
diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go
index 08098f7..23d33e8 100644
--- a/remoting/exchange_client.go
+++ b/remoting/exchange_client.go
@@ -134,6 +134,10 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url *comm
result.Rest = resultTmp.Rest
result.Attrs = resultTmp.Attrs
result.Err = resultTmp.Err
+ logger.Debugf("[ExchangeClient.Request] RPCResult from server: %v", resultTmp)
+ } else {
+ logger.Warnf("[ExchangeClient.Request] The type of result is unexpected, we want *protocol.RPCResult, "+
+ "but we got %T", rsp.response.Result)
}
return nil
}
diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go
index 78b0f61..65371f3 100644
--- a/remoting/getty/listener.go
+++ b/remoting/getty/listener.go
@@ -102,14 +102,14 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
result, ok := pkg.(remoting.DecodeResult)
if !ok {
- logger.Errorf("illegal package")
+ logger.Errorf("[RpcClientHandler.OnMessage] getty client gets an unexpected rpc result: %#v", result)
return
}
// get heartbeat request from server
if result.IsRequest {
req := result.Result.(*remoting.Request)
if req.Event {
- logger.Debugf("get rpc heartbeat request{%#v}", req)
+ logger.Debugf("[RpcClientHandler.OnMessage] getty client gets a heartbeat request: %#v", req)
resp := remoting.NewResponse(req.ID, req.Version)
resp.Status = hessian.Response_OK
resp.Event = req.Event
@@ -118,22 +118,23 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
reply(session, resp)
return
}
- logger.Errorf("illegal request but not heartbeat. {%#v}", req)
+ logger.Errorf("[RpcClientHandler.OnMessage] unexpected heartbeat request: %#v", req)
return
}
h.timeoutTimes = 0
p := result.Result.(*remoting.Response)
// get heartbeat
if p.Event {
- logger.Debugf("get rpc heartbeat response{%#v}", p)
+ logger.Debugf("[RpcClientHandler.OnMessage] getty client received a heartbeat response: %s", p)
if p.Error != nil {
- logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
+ logger.Errorf("[RpcClientHandler.OnMessage] a heartbeat response received by the getty client "+
+ "encounters an error: %v", p.Error)
}
p.Handle()
return
}
- logger.Debugf("get rpc response{%#v}", p)
+ logger.Debugf("[RpcClientHandler.OnMessage] getty client received a response: %s", p)
h.conn.updateSession(session)
@@ -303,11 +304,15 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
attachments[constant.LocalAddr] = session.LocalAddr()
attachments[constant.RemoteAddr] = session.RemoteAddr()
+ logger.Debugf("[RpcServerHandler.OnMessage] invoc.Attrs: %v, invoc.MethodName: %s",
+ invoc.Attachments(), invoc.MethodName())
+
result := h.server.requestHandler(invoc)
if !req.TwoWay {
return
}
resp.Result = result
+ logger.Debugf("[RpcServerHandler.OnMessage] result attrs: %v, req: %v", result.Attrs, req)
reply(session, resp)
}