You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/04/16 14:55:01 UTC

[dubbo-go] branch 1.5 updated: Fix: decode net stream bytes as getty rule (#1833)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 6266471cb Fix: decode net stream bytes as getty rule (#1833)
6266471cb is described below

commit 6266471cb855bdc2b77e352c17c955f5e508f725
Author: Jason Peng <lv...@gmail.com>
AuthorDate: Sat Apr 16 22:54:55 2022 +0800

    Fix: decode net stream bytes as getty rule (#1833)
---
 protocol/dubbo/dubbo_codec.go          | 49 ++++++++++++++++++++++------------
 protocol/dubbo/impl/codec.go           |  5 ++++
 remoting/codec.go                      |  2 +-
 remoting/getty/dubbo_codec_for_test.go | 21 ++++++++-------
 remoting/getty/listener.go             |  8 +++---
 remoting/getty/readwriter.go           | 36 +++++++++++--------------
 remoting/getty/readwriter_test.go      |  4 +--
 7 files changed, 71 insertions(+), 54 deletions(-)

diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 03534242b..64cb56c2b 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package dubbo
 
 import (
@@ -32,7 +33,7 @@ import (
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/protocol"
 	"github.com/apache/dubbo-go/protocol/dubbo/impl"
-	"github.com/apache/dubbo-go/protocol/invocation"
+	incvt "github.com/apache/dubbo-go/protocol/invocation"
 	"github.com/apache/dubbo-go/remoting"
 )
 
@@ -161,20 +162,27 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer,
 }
 
 // Decode data, including request and response.
-func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+func (c *DubboCodec) Decode(data []byte) (*remoting.DecodeResult, int, error) {
+	dataLen := len(data)
+	if dataLen < impl.HEADER_LENGTH { // check whether header bytes is enough or not
+		return nil, 0, nil
+	}
 	if c.isRequest(data) {
-		req, len, err := c.decodeRequest(data)
+		req, length, err := c.decodeRequest(data)
 		if err != nil {
-			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+			return &remoting.DecodeResult{}, length, perrors.WithStack(err)
 		}
-		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+		return &remoting.DecodeResult{IsRequest: true, Result: req}, length, perrors.WithStack(err)
 	}
 
-	resp, len, err := c.decodeResponse(data)
+	rsp, length, err := c.decodeResponse(data)
 	if err != nil {
-		return remoting.DecodeResult{}, len, perrors.WithStack(err)
+		return nil, length, perrors.WithStack(err)
+	}
+	if rsp == ((*remoting.Response)(nil)) {
+		return nil, length, err
 	}
-	return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+	return &remoting.DecodeResult{IsRequest: false, Result: rsp}, length, perrors.WithStack(err)
 }
 
 func (c *DubboCodec) isRequest(data []byte) bool {
@@ -183,16 +191,19 @@ func (c *DubboCodec) isRequest(data []byte) bool {
 
 // decode request
 func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
-	var request *remoting.Request = nil
+	var request *remoting.Request
 	buf := bytes.NewBuffer(data)
 	pkg := impl.NewDubboPackage(buf)
 	pkg.SetBody(make([]interface{}, 7))
 	err := pkg.Unmarshal()
 	if err != nil {
 		originErr := perrors.Cause(err)
-		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
-			//FIXME
-			return nil, 0, originErr
+		// this is impossible. because this case has been handled in 'protocol/dubbo/dubbo_codec.go:DubboCodec::Decode'
+		if originErr == hessian.ErrHeaderNotEnough {
+			return nil, 0, nil
+		}
+		if originErr == hessian.ErrBodyNotEnough {
+			return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil
 		}
 		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
 
@@ -224,8 +235,8 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error)
 		methodName = pkg.Service.Method
 		args = req[impl.ArgsKey].([]interface{})
 		attachments = req[impl.AttachmentsKey].(map[string]interface{})
-		invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
-			invocation.WithArguments(args), invocation.WithMethodName(methodName))
+		invoc := incvt.NewRPCInvocationWithOptions(incvt.WithAttachments(attachments),
+			incvt.WithArguments(args), incvt.WithMethodName(methodName))
 		request.Data = invoc
 
 	}
@@ -240,10 +251,14 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error
 	if err != nil {
 		originErr := perrors.Cause(err)
 		// if the data is very big, so the receive need much times.
-		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
-			return nil, 0, originErr
+		// this is impossible. because this case has been handled in 'protocol/dubbo/dubbo_codec.go:DubboCodec::Decode'
+		if originErr == hessian.ErrHeaderNotEnough {
+			return nil, 0, nil
 		}
-		logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
+		if originErr == hessian.ErrBodyNotEnough {
+			return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil
+		}
+		logger.Warnf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
 
 		return nil, 0, perrors.WithStack(err)
 	}
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index 1b3cb3629..50f0dc010 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -156,6 +156,11 @@ func (c *ProtocolCodec) Decode(p *DubboPackage) error {
 			return err
 		}
 	}
+
+	if c.reader.Size() < p.GetBodyLen() {
+		return hessian.ErrBodyNotEnough
+	}
+
 	body, err := c.reader.Peek(p.GetBodyLen())
 	if err != nil {
 		return err
diff --git a/remoting/codec.go b/remoting/codec.go
index 607d1643c..e5fd176c9 100644
--- a/remoting/codec.go
+++ b/remoting/codec.go
@@ -24,7 +24,7 @@ import (
 type Codec interface {
 	EncodeRequest(request *Request) (*bytes.Buffer, error)
 	EncodeResponse(response *Response) (*bytes.Buffer, error)
-	Decode(data []byte) (DecodeResult, int, error)
+	Decode(data []byte) (*DecodeResult, int, error)
 }
 
 type DecodeResult struct {
diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go
index ec414402d..524afde5b 100644
--- a/remoting/getty/dubbo_codec_for_test.go
+++ b/remoting/getty/dubbo_codec_for_test.go
@@ -155,19 +155,19 @@ func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buf
 }
 
 // Decode data, including request and response.
-func (c *DubboTestCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+func (c *DubboTestCodec) Decode(data []byte) (*remoting.DecodeResult, int, error) {
 	if c.isRequest(data) {
-		req, len, err := c.decodeRequest(data)
+		req, length, err := c.decodeRequest(data)
 		if err != nil {
-			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+			return &remoting.DecodeResult{}, length, perrors.WithStack(err)
 		}
-		return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+		return &remoting.DecodeResult{IsRequest: true, Result: req}, length, perrors.WithStack(err)
 	} else {
-		resp, len, err := c.decodeResponse(data)
+		rsp, length, err := c.decodeResponse(data)
 		if err != nil {
-			return remoting.DecodeResult{}, len, perrors.WithStack(err)
+			return &remoting.DecodeResult{}, length, perrors.WithStack(err)
 		}
-		return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+		return &remoting.DecodeResult{IsRequest: false, Result: rsp}, length, perrors.WithStack(err)
 	}
 }
 
@@ -232,8 +232,11 @@ func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, e
 	if err != nil {
 		originErr := perrors.Cause(err)
 		// if the data is very big, so the receive need much times.
-		if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
-			return nil, 0, originErr
+		if originErr == hessian.ErrHeaderNotEnough { // this is impossible, as dubbo_codec.go:DubboCodec::Decode() line 167
+			return nil, 0, nil
+		}
+		if originErr == hessian.ErrBodyNotEnough {
+			return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil
 		}
 		return nil, 0, perrors.WithStack(err)
 	}
diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go
index 512a82ce8..90035e55e 100644
--- a/remoting/getty/listener.go
+++ b/remoting/getty/listener.go
@@ -100,8 +100,8 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
 
 // OnMessage get response from getty server, and update the session to the getty client session list
 func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
-	result, ok := pkg.(remoting.DecodeResult)
-	if !ok {
+	result, ok := pkg.(*remoting.DecodeResult)
+	if !ok || result == ((*remoting.DecodeResult)(nil)) {
 		logger.Errorf("illegal package")
 		return
 	}
@@ -241,8 +241,8 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
 	}
 	h.rwlock.Unlock()
 
-	decodeResult, drOK := pkg.(remoting.DecodeResult)
-	if !drOK {
+	decodeResult, drOK := pkg.(*remoting.DecodeResult)
+	if !drOK || decodeResult == ((*remoting.DecodeResult)(nil)) {
 		logger.Errorf("illegal package{%#v}", pkg)
 		return
 	}
diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go
index 6aa4fb0be..400527c12 100644
--- a/remoting/getty/readwriter.go
+++ b/remoting/getty/readwriter.go
@@ -18,15 +18,12 @@
 package getty
 
 import (
-	"errors"
 	"reflect"
 )
 
 import (
 	"github.com/apache/dubbo-getty"
 
-	hessian "github.com/apache/dubbo-go-hessian2"
-
 	perrors "github.com/pkg/errors"
 )
 
@@ -48,19 +45,17 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
 // Read data from server. if the package size from server is larger than 4096 byte, server will read 4096 byte
 // and send to client each time. the Read can assemble it.
 func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
-	resp, length, err := (p.client.codec).Decode(data)
-	//err := pkg.Unmarshal(buf, p.client)
+	rsp, length, err := (p.client.codec).Decode(data)
 	if err != nil {
-		if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) {
-			return nil, 0, nil
-		}
-
-		logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
+		err = perrors.WithStack(err)
+	}
+	if rsp == ((*remoting.DecodeResult)(nil)) {
 		return nil, length, err
 	}
-
-	return resp, length, nil
+	if rsp.Result == ((*remoting.Response)(nil)) || rsp.Result == ((*remoting.Request)(nil)) {
+		return nil, length, err
+	}
+	return rsp, length, err
 }
 
 // Write send the data to server
@@ -110,15 +105,15 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
 // and send to client each time. the Read can assemble it.
 func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
 	req, length, err := (p.server.codec).Decode(data)
-	//resp,len, err := (*p.).DecodeResponse(buf)
 	if err != nil {
-		if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) {
-			return nil, 0, nil
-		}
-
-		logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
+		err = perrors.WithStack(err)
+	}
+	if req == ((*remoting.DecodeResult)(nil)) {
+		return nil, length, err
+	}
 
-		return nil, 0, err
+	if req.Result == ((*remoting.Request)(nil)) || req.Result == ((*remoting.Response)(nil)) {
+		return nil, length, err // as getty rule
 	}
 
 	return req, length, err
@@ -148,5 +143,4 @@ func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by
 
 	logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
 	return nil, perrors.New("invalid rpc response")
-
 }
diff --git a/remoting/getty/readwriter_test.go b/remoting/getty/readwriter_test.go
index eb4891daa..3d8fddd2e 100644
--- a/remoting/getty/readwriter_test.go
+++ b/remoting/getty/readwriter_test.go
@@ -197,8 +197,8 @@ func testDecodeTCPPackage(t *testing.T, svr *Server, client *Client) {
 	assert.True(t, incompletePkgLen >= impl.HEADER_LENGTH, "header buffer too short")
 	incompletePkg := pkgBytes[0 : incompletePkgLen-1]
 	pkg, pkgLen, err := pkgReadHandler.Read(nil, incompletePkg)
-	assert.NoError(t, err)
-	assert.Equal(t, pkg, nil)
+	assert.Equal(t, err.Error(), "body buffer too short")
+	assert.Equal(t, pkg.(*remoting.DecodeResult).Result, nil)
 	assert.Equal(t, pkgLen, 0)
 }