You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/04/09 07:38:35 UTC

[dubbo-go] 01/01: return decode value as getty standard

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch fix/getty-eof
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit a2869eb2865e57f1ce67b8fcf0da6524ceb467ab
Author: AlexStocks <al...@foxmail.com>
AuthorDate: Sat Apr 9 15:34:56 2022 +0800

    return decode value as getty standard
---
 protocol/dubbo/dubbo_codec.go | 34 +++++++++++++++++++++-------------
 protocol/dubbo/impl/codec.go  |  3 +++
 remoting/getty/readwriter.go  |  9 +++++----
 3 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 31b13dc00..03f228d00 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -34,7 +34,7 @@ import (
 	"dubbo.apache.org/dubbo-go/v3/common/logger"
 	"dubbo.apache.org/dubbo-go/v3/protocol"
 	"dubbo.apache.org/dubbo-go/v3/protocol/dubbo/impl"
-	"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+	invct "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
 	"dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
@@ -161,19 +161,25 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
 
 // Decode data, including request and response.
 func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+	var res remoting.DecodeResult
+
+	dataLen := len(data)
+	if dataLen < impl.HEADER_LENGTH { // check whether header bytes is enough or not
+		return res, 0, nil
+	}
 	if c.isRequest(data) {
-		req, len, err := c.decodeRequest(data)
+		req, length, err := c.decodeRequest(data)
 		if err != nil {
-			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+			return remoting.DecodeResult{}, length, perrors.WithStack(err)
 		}
-		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+		return remoting.DecodeResult{IsRequest: true, Result: req}, length, perrors.WithStack(err)
 	}
 
-	resp, len, err := c.decodeResponse(data)
+	resp, length, err := c.decodeResponse(data)
 	if err != nil {
-		return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		return remoting.DecodeResult{}, length, perrors.WithStack(err)
 	}
-	return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+	return remoting.DecodeResult{IsRequest: false, Result: resp}, length, perrors.WithStack(err)
 }
 
 func (c *DubboCodec) isRequest(data []byte) bool {
@@ -182,16 +188,18 @@ func (c *DubboCodec) isRequest(data []byte) bool {
 
 // decode request
 func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
-	var request *remoting.Request = nil
+	var request *remoting.Request
 	buf := bytes.NewBuffer(data)
 	pkg := impl.NewDubboPackage(buf)
 	pkg.SetBody(make([]interface{}, 7))
 	err := pkg.Unmarshal()
 	if err != nil {
 		originErr := perrors.Cause(err)
-		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
-			// FIXME
-			return nil, 0, originErr
+		if originErr == hessian.ErrHeaderNotEnough { // this is impossible, as dubbo_codec.go:DubboCodec::Decode() line 167
+			return nil, 0, nil
+		}
+		if originErr == hessian.ErrBodyNotEnough {
+			return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil
 		}
 		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
 
@@ -223,8 +231,8 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
 		methodName = pkg.Service.Method
 		args = req[impl.ArgsKey].([]interface{})
 		attachments = req[impl.AttachmentsKey].(map[string]interface{})
-		invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
-			invocation.WithArguments(args), invocation.WithMethodName(methodName))
+		invoc := invct.NewRPCInvocationWithOptions(invct.WithAttachments(attachments),
+			invct.WithArguments(args), invct.WithMethodName(methodName))
 		request.Data = invoc
 
 	}
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index cc7abad48..95acbd9e0 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -157,6 +157,9 @@ func (c *ProtocolCodec) Decode(p *DubboPackage) error {
 			return err
 		}
 	}
+	if c.reader.Size() < p.GetBodyLen() {
+		return hessian.ErrBodyNotEnough
+	}
 	body, err := c.reader.Peek(p.GetBodyLen())
 	if err != nil {
 		return err
diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go
index c78f354d6..48f2466c5 100644
--- a/remoting/getty/readwriter.go
+++ b/remoting/getty/readwriter.go
@@ -49,15 +49,16 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
 // and send to client each time. the Read can assemble it.
 func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
 	resp, length, err := (p.client.codec).Decode(data)
-	// err := pkg.Unmarshal(buf, p.client)
 	if err != nil {
-		if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) {
+		if errors.Is(err, hessian.ErrHeaderNotEnough) {
 			return nil, 0, nil
 		}
+		if errors.Is(err, hessian.ErrBodyNotEnough) {
+			return nil, length, nil
+		}
 
 		logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
-		return nil, length, err
+		return nil, 0, err
 	}
 
 	return resp, length, nil