You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/04/09 07:38:35 UTC
[dubbo-go] 01/01: return decode value as getty standard
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch fix/getty-eof
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit a2869eb2865e57f1ce67b8fcf0da6524ceb467ab
Author: AlexStocks <al...@foxmail.com>
AuthorDate: Sat Apr 9 15:34:56 2022 +0800
return decode value as getty standard
---
protocol/dubbo/dubbo_codec.go | 34 +++++++++++++++++++++-------------
protocol/dubbo/impl/codec.go | 3 +++
remoting/getty/readwriter.go | 9 +++++----
3 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 31b13dc00..03f228d00 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -34,7 +34,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/dubbo/impl"
- "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ invct "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -161,19 +161,25 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
// Decode data, including request and response.
func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+ var res remoting.DecodeResult
+
+ dataLen := len(data)
+ if dataLen < impl.HEADER_LENGTH { // check whether header bytes is enough or not
+ return res, 0, nil
+ }
if c.isRequest(data) {
- req, len, err := c.decodeRequest(data)
+ req, length, err := c.decodeRequest(data)
if err != nil {
- return remoting.DecodeResult{}, len, perrors.WithStack(err)
+ return remoting.DecodeResult{}, length, perrors.WithStack(err)
}
- return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+ return remoting.DecodeResult{IsRequest: true, Result: req}, length, perrors.WithStack(err)
}
- resp, len, err := c.decodeResponse(data)
+ resp, length, err := c.decodeResponse(data)
if err != nil {
- return remoting.DecodeResult{}, len, perrors.WithStack(err)
+ return remoting.DecodeResult{}, length, perrors.WithStack(err)
}
- return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+ return remoting.DecodeResult{IsRequest: false, Result: resp}, length, perrors.WithStack(err)
}
func (c *DubboCodec) isRequest(data []byte) bool {
@@ -182,16 +188,18 @@ func (c *DubboCodec) isRequest(data []byte) bool {
// decode request
func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
- var request *remoting.Request = nil
+ var request *remoting.Request
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
pkg.SetBody(make([]interface{}, 7))
err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
- if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
- // FIXME
- return nil, 0, originErr
+ if originErr == hessian.ErrHeaderNotEnough { // this is impossible, as dubbo_codec.go:DubboCodec::Decode() line 167
+ return nil, 0, nil
+ }
+ if originErr == hessian.ErrBodyNotEnough {
+ return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil
}
logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
@@ -223,8 +231,8 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
methodName = pkg.Service.Method
args = req[impl.ArgsKey].([]interface{})
attachments = req[impl.AttachmentsKey].(map[string]interface{})
- invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
- invocation.WithArguments(args), invocation.WithMethodName(methodName))
+ invoc := invct.NewRPCInvocationWithOptions(invct.WithAttachments(attachments),
+ invct.WithArguments(args), invct.WithMethodName(methodName))
request.Data = invoc
}
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index cc7abad48..95acbd9e0 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -157,6 +157,9 @@ func (c *ProtocolCodec) Decode(p *DubboPackage) error {
return err
}
}
+ if c.reader.Size() < p.GetBodyLen() {
+ return hessian.ErrBodyNotEnough
+ }
body, err := c.reader.Peek(p.GetBodyLen())
if err != nil {
return err
diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go
index c78f354d6..48f2466c5 100644
--- a/remoting/getty/readwriter.go
+++ b/remoting/getty/readwriter.go
@@ -49,15 +49,16 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
// and send to client each time. the Read can assemble it.
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
resp, length, err := (p.client.codec).Decode(data)
- // err := pkg.Unmarshal(buf, p.client)
if err != nil {
- if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) {
+ if errors.Is(err, hessian.ErrHeaderNotEnough) {
return nil, 0, nil
}
+ if errors.Is(err, hessian.ErrBodyNotEnough) {
+ return nil, length, nil
+ }
logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
- return nil, length, err
+ return nil, 0, err
}
return resp, length, nil