You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/07/26 08:28:53 UTC
[dubbo-go] 03/04: Fix: merge stash
This is an automated email from the ASF dual-hosted git repository.
fangyc pushed a commit to branch refact-seri
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 9b260cf13b369e4797c35528fc8268b735768ff4
Merge: 0e2de75 1a42f33
Author: fangyincheng <fa...@sina.com>
AuthorDate: Sun Jul 26 04:49:12 2020 +0800
Fix: merge stash
common/constant/key.go | 42 +-
common/url.go | 11 +-
config/application_config.go | 10 +
protocol/dubbo/client.go | 70 ---
protocol/dubbo/client_test.go | 301 -------------
protocol/dubbo/dubbo_codec.go | 366 +++++++++++++++
protocol/dubbo/dubbo_invoker.go | 55 ++-
protocol/dubbo/dubbo_invoker_test.go | 163 ++++++-
protocol/dubbo/dubbo_protocol.go | 149 +++++-
protocol/dubbo/dubbo_protocol_test.go | 78 +++-
protocol/dubbo/impl/codec_test.go | 2 +-
protocol/dubbo/impl/const.go | 18 +-
protocol/dubbo/impl/proto.go | 12 +-
protocol/dubbo/impl/remoting/client_impl.go | 391 ----------------
protocol/dubbo/server.go | 125 ------
protocol/invocation/rpcinvocation.go | 7 +
protocol/jsonrpc/http_test.go | 22 +-
protocol/jsonrpc/jsonrpc_invoker.go | 3 +-
.../protocolwrapper/protocol_filter_wrapper.go | 3 +
.../impl/remoting/errors.go => remoting/codec.go | 28 ++
remoting/exchange.go | 144 ++++++
remoting/exchange_client.go | 227 ++++++++++
.../errors.go => remoting/exchange_server.go | 36 ++
.../impl/remoting => remoting/getty}/config.go | 66 ++-
remoting/getty/dubbo_codec_for_test.go | 382 ++++++++++++++++
remoting/getty/getty_client.go | 219 +++++++++
remoting/getty/getty_client_test.go | 497 +++++++++++++++++++++
.../getty/getty_server.go | 98 ++--
.../getty/listener.go | 204 ++++-----
.../getty/listener_test.go | 18 +-
.../dubbo/impl/remoting => remoting/getty}/pool.go | 82 ++--
.../impl/remoting => remoting/getty}/readwriter.go | 124 ++---
32 files changed, 2667 insertions(+), 1286 deletions(-)
diff --cc common/constant/key.go
index 6e73183,d8eff3a..06b37cf
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@@ -22,27 -22,26 +22,27 @@@ const
)
const (
- GROUP_KEY = "group"
- VERSION_KEY = "version"
- INTERFACE_KEY = "interface"
- PATH_KEY = "path"
- SERVICE_KEY = "service"
- METHODS_KEY = "methods"
- TIMEOUT_KEY = "timeout"
- CATEGORY_KEY = "category"
- CHECK_KEY = "check"
- ENABLED_KEY = "enabled"
- SIDE_KEY = "side"
- OVERRIDE_PROVIDERS_KEY = "providerAddresses"
- BEAN_NAME_KEY = "bean.name"
- GENERIC_KEY = "generic"
- CLASSIFIER_KEY = "classifier"
- TOKEN_KEY = "token"
- LOCAL_ADDR = "local-addr"
- REMOTE_ADDR = "remote-addr"
- DUBBO_KEY = "dubbo"
- RELEASE_KEY = "release"
- ANYHOST_KEY = "anyhost"
+ GROUP_KEY = "group"
+ VERSION_KEY = "version"
+ INTERFACE_KEY = "interface"
+ PATH_KEY = "path"
+ SERVICE_KEY = "service"
+ METHODS_KEY = "methods"
+ TIMEOUT_KEY = "timeout"
+ CATEGORY_KEY = "category"
+ CHECK_KEY = "check"
+ ENABLED_KEY = "enabled"
+ SIDE_KEY = "side"
+ OVERRIDE_PROVIDERS_KEY = "providerAddresses"
+ BEAN_NAME_KEY = "bean.name"
+ GENERIC_KEY = "generic"
+ CLASSIFIER_KEY = "classifier"
+ TOKEN_KEY = "token"
+ LOCAL_ADDR = "local-addr"
+ REMOTE_ADDR = "remote-addr"
- PATH_SEPARATOR = "/"
+ DEFAULT_REMOTING_TIMEOUT = 3000
++ RELEASE_KEY = "release"
++ ANYHOST_KEY = "anyhost"
)
const (
diff --cc common/url.go
index e58f652,01c623e..5a3e69f
--- a/common/url.go
+++ b/common/url.go
@@@ -316,11 -313,16 +317,15 @@@ func (c URL) Key() string
"%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
return buildString
- //return c.ServiceKey()
}
-// ServiceKey ...
+// ServiceKey gets a unique key of a service.
func (c URL) ServiceKey() string {
- intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
+ return ServiceKey(c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")),
+ c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
+ }
+
+ func ServiceKey(intf string, group string, version string) string {
if intf == "" {
return ""
}
diff --cc config/application_config.go
index 16e8417,1d9306c..cc02da3
--- a/config/application_config.go
+++ b/config/application_config.go
@@@ -41,7 -40,17 +41,17 @@@ func (*ApplicationConfig) Prefix() stri
return constant.DUBBO + ".application."
}
+ // nolint
+ func (c *ApplicationConfig) Id() string {
+ return ""
+ }
+
+ // SetId ...
+ func (c *ApplicationConfig) SetId(id string) {
+
+ }
+
-// UnmarshalYAML ...
+// UnmarshalYAML unmarshals the ApplicationConfig by @unmarshal function
func (c *ApplicationConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(c); err != nil {
return err
diff --cc protocol/dubbo/dubbo_codec.go
index 0000000,f4a39cb..e575c79
mode 000000,100644..100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@@ -1,0 -1,356 +1,366 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package dubbo
+
+ import (
- "bufio"
+ "bytes"
- "fmt"
++ "github.com/apache/dubbo-go/protocol/dubbo/impl"
+ "strconv"
+ "time"
+ )
+
+ import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ perrors "github.com/pkg/errors"
+ )
+
+ import (
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/apache/dubbo-go/remoting"
+ )
+
+ //SerialID serial ID
+ type SerialID byte
+
+ const (
+ // S_Dubbo dubbo serial id
+ S_Dubbo SerialID = 2
+ )
+
+ func init() {
+ codec := &DubboCodec{}
+ // this is for registry dubboCodec of dubbo protocol
+ remoting.RegistryCodec("dubbo", codec)
+ }
+
+ // DubboPackage is for hessian encode/decode. If we refactor hessian, it will also be refactored.
-type DubboPackage struct {
- Header hessian.DubboHeader
- Service hessian.Service
- Body interface{}
- Err error
-}
++//type DubboPackage struct {
++// Header hessian.DubboHeader
++// Service hessian.Service
++// Body interface{}
++// Err error
++//}
+
+ // String of DubboPackage
-func (p DubboPackage) String() string {
- return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
-}
++//func (p DubboPackage) String() string {
++// return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
++//}
+
+ // nolint
-func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
- codec := hessian.NewHessianCodec(nil)
-
- pkg, err := codec.Write(p.Service, p.Header, p.Body)
- if err != nil {
- return nil, perrors.WithStack(err)
- }
-
- return bytes.NewBuffer(pkg), nil
-}
++//func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
++// codec := hessian.NewHessianCodec(nil)
++//
++// pkg, err := codec.Write(p.Service, p.Header, p.Body)
++// if err != nil {
++// return nil, perrors.WithStack(err)
++// }
++//
++// return bytes.NewBuffer(pkg), nil
++//}
+
+ // nolint
-func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
- // fix issue https://github.com/apache/dubbo-go/issues/380
- bufLen := buf.Len()
- if bufLen < hessian.HEADER_LENGTH {
- return perrors.WithStack(hessian.ErrHeaderNotEnough)
- }
-
- codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
-
- // read header
- err := codec.ReadHeader(&p.Header)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- if resp != nil { // for client
- if (p.Header.Type & hessian.PackageRequest) != 0x00 {
- // size of this array must be '7'
- // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
- p.Body = make([]interface{}, 7)
- } else {
- pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
- if pendingRsp == nil {
- return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
- }
- p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
- }
- }
- // read body
- err = codec.ReadBody(p.Body)
- return perrors.WithStack(err)
-}
++//func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) error {
++// // fix issue https://github.com/apache/dubbo-go/issues/380
++// bufLen := buf.Len()
++// if bufLen < hessian.HEADER_LENGTH {
++// return perrors.WithStack(hessian.ErrHeaderNotEnough)
++// }
++//
++// codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
++//
++// // read header
++// err := codec.ReadHeader(&p.Header)
++// if err != nil {
++// return perrors.WithStack(err)
++// }
++//
++// if resp != nil { // for client
++// if (p.Header.Type & hessian.PackageRequest) != 0x00 {
++// // size of this array must be '7'
++// // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
++// p.Body = make([]interface{}, 7)
++// } else {
++// pendingRsp := remoting.GetPendingResponse(remoting.SequenceType(p.Header.ID))
++// if pendingRsp == nil {
++// return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
++// }
++// p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
++// }
++// }
++// // read body
++// err = codec.ReadBody(p.Body)
++// return perrors.WithStack(err)
++//}
+
+ // DubboCodec. It is implements remoting.Codec
+ type DubboCodec struct {
+ }
+
+ // encode request for transport
+ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) {
+ if request.Event {
+ return c.encodeHeartbeartReqeust(request)
+ }
+
+ invoc, ok := request.Data.(*protocol.Invocation)
+ if !ok {
+ logger.Errorf("encode request failed for parameter type :%+v", request)
+ return nil, perrors.Errorf("encode request failed for parameter type :%+v", request)
+ }
+ invocation := *invoc
+
- p := &DubboPackage{}
- p.Service.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
- p.Service.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
- p.Service.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
- p.Service.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
- p.Service.Method = invocation.MethodName()
-
++ svc := impl.Service{}
++ svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "")
++ svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "")
++ svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "")
++ svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "")
++ svc.Method = invocation.MethodName()
+ timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT)))
+ if err != nil {
+ // it will be wrapped in readwrite.Write .
+ return nil, perrors.WithStack(err)
+ }
- p.Service.Timeout = time.Duration(timeout)
-
- p.Header.SerialID = byte(S_Dubbo)
- p.Header.ID = request.ID
++ svc.Timeout = time.Duration(timeout)
++
++ header := impl.DubboHeader{}
++ serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION)
++ if serialization == constant.HESSIAN2_SERIALIZATION {
++ header.SerialID = constant.S_Hessian2
++ } else if serialization == constant.PROTOBUF_SERIALIZATION {
++ header.SerialID = constant.S_Proto
++ }
++ header.ID = request.ID
+ if request.TwoWay {
- p.Header.Type = hessian.PackageRequest_TwoWay
++ header.Type = impl.PackageRequest_TwoWay
+ } else {
- p.Header.Type = hessian.PackageRequest
++ header.Type = impl.PackageRequest
+ }
+
- p.Body = hessian.NewRequest(invocation.Arguments(), invocation.Attachments())
-
- codec := hessian.NewHessianCodec(nil)
++ pkg := &impl.DubboPackage{
++ Header: header,
++ Service: svc,
++ Body: impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
++ Err: nil,
++ Codec: impl.NewDubboCodec(nil),
++ }
+
- pkg, err := codec.Write(p.Service, p.Header, p.Body)
- if err != nil {
++ if err := impl.LoadSerializer(pkg); err != nil {
+ return nil, perrors.WithStack(err)
+ }
+
- return bytes.NewBuffer(pkg), nil
++ return pkg.Marshal()
+ }
+
+ // encode heartbeart request
+ func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) {
- pkg := &DubboPackage{}
- pkg.Body = []interface{}{}
- pkg.Header.ID = request.ID
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
-
- codec := hessian.NewHessianCodec(nil)
++ header := impl.DubboHeader{
++ Type: impl.PackageHeartbeat,
++ SerialID: constant.S_Hessian2,
++ ID: request.ID,
++ }
+
- byt, err := codec.Write(pkg.Service, pkg.Header, pkg.Body)
- if err != nil {
- return nil, perrors.WithStack(err)
++ pkg := &impl.DubboPackage{
++ Header: header,
++ Service: impl.Service{},
++ Body: impl.NewRequestPayload([]interface{}{}, nil),
++ Err: nil,
++ Codec: impl.NewDubboCodec(nil),
+ }
+
- return bytes.NewBuffer(byt), nil
++ if err := impl.LoadSerializer(pkg); err != nil {
++ return nil, err
++ }
++ return pkg.Marshal()
+ }
+
+ // encode response
+ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) {
- var ptype = hessian.PackageResponse
++ var ptype = impl.PackageResponse
+ if response.IsHeartbeat() {
- ptype = hessian.PackageHeartbeat
++ ptype = impl.PackageHeartbeat
+ }
- resp := &DubboPackage{
- Header: hessian.DubboHeader{
++ resp := &impl.DubboPackage{
++ Header: impl.DubboHeader{
+ SerialID: response.SerialID,
+ Type: ptype,
+ ID: response.ID,
+ ResponseStatus: response.Status,
+ },
+ }
+ if !response.IsHeartbeat() {
+ resp.Body = &hessian.Response{
+ RspObj: response.Result.(protocol.RPCResult).Rest,
+ Exception: response.Result.(protocol.RPCResult).Err,
+ Attachments: response.Result.(protocol.RPCResult).Attrs,
+ }
+ }
+
- codec := hessian.NewHessianCodec(nil)
++ codec := impl.NewDubboCodec(nil)
+
- pkg, err := codec.Write(resp.Service, resp.Header, resp.Body)
++ pkg, err := codec.Encode(*resp)
+ if err != nil {
+ return nil, perrors.WithStack(err)
+ }
+
+ return bytes.NewBuffer(pkg), nil
+ }
+
+ // Decode data, including request and response.
+ func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) {
+ if c.isRequest(data) {
+ req, len, err := c.decodeRequest(data)
+ if err != nil {
+ return remoting.DecodeResult{}, len, perrors.WithStack(err)
+ }
+ return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err)
+ } else {
+ resp, len, err := c.decodeResponse(data)
+ if err != nil {
+ return remoting.DecodeResult{}, len, perrors.WithStack(err)
+ }
+ return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err)
+ }
+ }
+
+ func (c *DubboCodec) isRequest(data []byte) bool {
+ if data[2]&byte(0x80) == 0x00 {
+ return false
+ }
+ return true
+ }
+
+ // decode request
+ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) {
- pkg := &DubboPackage{
- Body: make([]interface{}, 7),
- }
+ var request *remoting.Request = nil
+ buf := bytes.NewBuffer(data)
- err := pkg.Unmarshal(buf, nil)
++ pkg := impl.NewDubboPackage(buf)
++ pkg.SetBody(make([]interface{}, 7))
++ err := pkg.Unmarshal()
+ if err != nil {
+ originErr := perrors.Cause(err)
+ if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+ //FIXME
+ return nil, 0, originErr
+ }
+ logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
+
+ return request, 0, perrors.WithStack(err)
+ }
+ request = &remoting.Request{
+ ID: pkg.Header.ID,
+ SerialID: pkg.Header.SerialID,
- TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00,
- Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00,
++ TwoWay: pkg.Header.Type&impl.PackageRequest_TwoWay != 0x00,
++ Event: pkg.Header.Type&impl.PackageHeartbeat != 0x00,
+ }
- if (pkg.Header.Type & hessian.PackageHeartbeat) == 0x00 {
++ if (pkg.Header.Type & impl.PackageHeartbeat) == 0x00 {
+ // convert params of request
+ req := pkg.Body.([]interface{}) // length of body should be 7
+ if len(req) > 0 {
+ //invocation := request.Data.(*invocation.RPCInvocation)
+ var methodName string
+ var args []interface{}
+ var attachments map[string]string = make(map[string]string)
+ if req[0] != nil {
+ //dubbo version
+ request.Version = req[0].(string)
+ }
+ if req[1] != nil {
+ //path
+ attachments[constant.PATH_KEY] = req[1].(string)
+ }
+ if req[2] != nil {
+ //version
+ attachments[constant.VERSION_KEY] = req[2].(string)
+ }
+ if req[3] != nil {
+ //method
+ methodName = req[3].(string)
+ }
+ if req[4] != nil {
+ //ignore argTypes
+ }
+ if req[5] != nil {
+ args = req[5].([]interface{})
+ }
+ if req[6] != nil {
+ attachments = req[6].(map[string]string)
+ }
+ invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments),
+ invocation.WithArguments(args), invocation.WithMethodName(methodName))
+ request.Data = invoc
+ }
+ }
+ return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ }
+
+ // decode response
+ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) {
- pkg := &DubboPackage{}
+ buf := bytes.NewBuffer(data)
++ pkg := impl.NewDubboPackage(buf)
+ response := &remoting.Response{}
- err := pkg.Unmarshal(buf, response)
++ err := pkg.Unmarshal()
+ if err != nil {
+ originErr := perrors.Cause(err)
+ // if the data is very big, so the receive need much times.
+ if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+ return nil, 0, originErr
+ }
+ logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err)
+
+ return nil, 0, perrors.WithStack(err)
+ }
+ response = &remoting.Response{
+ ID: pkg.Header.ID,
+ //Version: pkg.Header.,
+ SerialID: pkg.Header.SerialID,
+ Status: pkg.Header.ResponseStatus,
- Event: (pkg.Header.Type & hessian.PackageHeartbeat) != 0,
++ Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0,
+ }
+ var error error
- if pkg.Header.Type&hessian.PackageHeartbeat != 0x00 {
- if pkg.Header.Type&hessian.PackageResponse != 0x00 {
++ if pkg.Header.Type&impl.PackageHeartbeat != 0x00 {
++ if pkg.Header.Type&impl.PackageResponse != 0x00 {
+ logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
+ if pkg.Err != nil {
+ logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err)
+ error = pkg.Err
+ }
+ } else {
+ logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body)
+ response.Status = hessian.Response_OK
+ //reply(session, p, hessian.PackageHeartbeat)
+ }
+ return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
+ }
+ logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
+ rpcResult := &protocol.RPCResult{}
+ response.Result = rpcResult
- if pkg.Header.Type&hessian.PackageRequest == 0x00 {
++ if pkg.Header.Type&impl.PackageRequest == 0x00 {
+ if pkg.Err != nil {
+ rpcResult.Err = pkg.Err
+ } else if pkg.Body.(*hessian.Response).Exception != nil {
+ rpcResult.Err = pkg.Body.(*hessian.Response).Exception
+ response.Error = rpcResult.Err
+ }
+ rpcResult.Attrs = pkg.Body.(*hessian.Response).Attachments
+ rpcResult.Rest = pkg.Body.(*hessian.Response).RspObj
+ }
+
+ return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ }
diff --cc protocol/dubbo/dubbo_invoker.go
index 1907c38,d94169d..e51cc5f
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@@ -128,7 -141,22 +146,22 @@@ func (di *DubboInvoker) Invoke(ctx cont
return &result
}
+ // get timeout including methodConfig
+ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration {
+ var timeout = di.GetUrl().GetParam(strings.Join([]string{constant.METHOD_KEYS, invocation.MethodName(), constant.TIMEOUT_KEY}, "."), "")
+ if len(timeout) != 0 {
+ if t, err := time.ParseDuration(timeout); err == nil {
+ // config timeout into attachment
+ invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(t.Milliseconds())))
+ return t
+ }
+ }
+ // set timeout into invocation at method level
+ invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds())))
+ return di.timeout
+ }
+
-// Destroy ...
+// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
for {
diff --cc protocol/dubbo/dubbo_invoker_test.go
index e96e859,88c1910..9585461
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@@ -93,3 -92,143 +92,143 @@@ func TestDubboInvoker_Invoke(t *testing
proto.Destroy()
lock.Unlock()
}
+
+ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
+
+ hessian.RegisterPOJO(&User{})
+
- methods, err := common.ServiceMap.Register("dubbo", &UserProvider{})
++ methods, err := common.ServiceMap.Register("", "dubbo", &UserProvider{})
+ assert.NoError(t, err)
+ assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods)
+
+ // config
+ getty.SetClientConf(getty.ClientConfig{
+ ConnectionNum: 2,
+ HeartbeatPeriod: "5s",
+ SessionTimeout: "20s",
+ PoolTTL: 600,
+ PoolSize: 64,
+ GettySessionParam: getty.GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "4s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "client",
+ },
+ })
+ getty.SetServerConfig(getty.ServerConfig{
+ SessionNumber: 700,
+ SessionTimeout: "20s",
+ GettySessionParam: getty.GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "1s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "server",
+ }})
+
+ // Export
+ proto := GetProtocol()
+ url, err := common.NewURL("dubbo://127.0.0.1:20702/UserProvider?anyhost=true&" +
+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider")
+ assert.NoError(t, err)
+ proto.Export(&proxy_factory.ProxyInvoker{
+ BaseInvoker: *protocol.NewBaseInvoker(url),
+ })
+
+ time.Sleep(time.Second * 2)
+
+ return proto, url
+ }
+
+ //////////////////////////////////
+ // provider
+ //////////////////////////////////
+
+ type (
+ User struct {
+ Id string `json:"id"`
+ Name string `json:"name"`
+ }
+
+ UserProvider struct {
+ user map[string]User
+ }
+ )
+
+ // size:4801228
+ func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error {
+ argBuf := new(bytes.Buffer)
+ for i := 0; i < 400; i++ {
+ // use chinese for test
+ argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
+ argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
+ }
+ rsp.Id = argBuf.String()
+ rsp.Name = argBuf.String()
+ return nil
+ }
+
+ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error {
+ rsp.Id = req[0].(string)
+ rsp.Name = req[1].(string)
+ return nil
+ }
+
+ func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
+ return User{Id: id, Name: name}, nil
+ }
+
+ func (u *UserProvider) GetUser1() error {
+ return nil
+ }
+
+ func (u *UserProvider) GetUser2() error {
+ return perrors.New("error")
+ }
+
+ func (u *UserProvider) GetUser3(rsp *[]interface{}) error {
+ *rsp = append(*rsp, User{Id: "1", Name: "username"})
+ return nil
+ }
+
+ func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) {
+
+ return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil
+ }
+
+ func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) {
+ return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil
+ }
+
+ func (u *UserProvider) GetUser6(id int64) (*User, error) {
+ if id == 0 {
+ return nil, nil
+ }
+ return &User{Id: "1"}, nil
+ }
+
+ func (u *UserProvider) Reference() string {
+ return "UserProvider"
+ }
+
+ func (u User) JavaClassName() string {
+ return "com.ikurento.user.User"
+ }
diff --cc protocol/dubbo/dubbo_protocol.go
index cc67569,2d8a20c..915ee74
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@@ -29,11 -35,14 +35,13 @@@ import
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
- "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
+ "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/getty"
)
-// dubbo protocol constant
const (
- // DUBBO ...
+ // DUBBO is dubbo protocol name
DUBBO = "dubbo"
)
@@@ -72,20 -91,14 +89,14 @@@ func (dp *DubboProtocol) Export(invoke
return exporter
}
-// nolint
+// Refer create dubbo service reference.
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
- //default requestTimeout
- var requestTimeout = config.GetConsumerConfig().RequestTimeout
-
- requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
- if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
- requestTimeout = t
+ exchangeClient := getExchangeClient(url)
+ if exchangeClient == nil {
+ logger.Warnf("can't dial the server: %+v", url.Location)
+ return nil
}
-
- invoker := NewDubboInvoker(url, remoting.NewClient(remoting.Options{
- ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
- RequestTimeout: requestTimeout,
- }))
+ invoker := NewDubboInvoker(url, exchangeClient)
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
diff --cc protocol/dubbo/dubbo_protocol_test.go
index 1915a07,55ab0fe..07b890f
--- a/protocol/dubbo/dubbo_protocol_test.go
+++ b/protocol/dubbo/dubbo_protocol_test.go
@@@ -29,15 -26,56 +26,58 @@@ import
)
import (
- "github.com/stretchr/testify/assert"
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/remoting/getty"
)
+ func init() {
+ getty.SetServerConfig(getty.ServerConfig{
+ SessionNumber: 700,
+ SessionTimeout: "20s",
+ GettySessionParam: getty.GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "1s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "server",
+ }})
+ getty.SetClientConf(getty.ClientConfig{
+ ConnectionNum: 1,
+ HeartbeatPeriod: "3s",
+ SessionTimeout: "20s",
+ PoolTTL: 600,
+ PoolSize: 64,
+ GettySessionParam: getty.GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "4s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "client",
+ },
+ })
+ }
func TestDubboProtocol_Export(t *testing.T) {
- srvCfg := remoting.GetDefaultServerConfig()
- remoting.SetServerConfig(srvCfg)
++ srvCfg := getty.GetDefaultServerConfig()
++ getty.SetServerConfig(srvCfg)
// Export
proto := GetProtocol()
- url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
+ url, err := common.NewURL("dubbo://127.0.0.1:20094/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
@@@ -76,12 -114,24 +116,26 @@@
assert.False(t, ok)
}
+ func TestDubboProtocol_Refer_No_connect(t *testing.T) {
+ // Refer
+ proto := GetProtocol()
+ url, err := common.NewURL("dubbo://127.0.0.1:20096/com.ikurento.user.UserProvider?anyhost=true&" +
+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
+ "side=provider&timeout=3000×tamp=1556509797245")
+ assert.NoError(t, err)
+ invoker := proto.Refer(url)
+ assert.Nil(t, invoker)
+ }
+
func TestDubboProtocol_Refer(t *testing.T) {
- cliCfg := remoting.GetDefaultClientConfig()
- remoting.SetClientConf(cliCfg)
++ cliCfg := getty.GetDefaultClientConfig()
++ getty.SetClientConf(cliCfg)
// Refer
proto := GetProtocol()
- url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
+
+ url, err := common.NewURL("dubbo://127.0.0.1:20091/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
diff --cc protocol/dubbo/impl/codec_test.go
index c93f307,0000000..92f2f2e
mode 100644,000000..100644
--- a/protocol/dubbo/impl/codec_test.go
+++ b/protocol/dubbo/impl/codec_test.go
@@@ -1,197 -1,0 +1,197 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/impl/proto"
+)
+
+func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
+ pkg := NewDubboPackage(nil)
+ pkg.Body = []interface{}{"a"}
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Hessian2
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(HessianSerializer{})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
+ pkgres.Body = []interface{}{}
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
+
+ // request
+ pkg.Header.Type = PackageRequest
+ pkg.Service.Interface = "Service"
+ pkg.Service.Path = "path"
+ pkg.Service.Version = "2.6"
+ pkg.Service.Method = "Method"
+ pkg.Service.Timeout = time.Second
+ data, err = pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+ pkgres.Body = make([]interface{}, 7)
+ err = pkgres.Unmarshal()
+ reassembleBody := pkgres.GetBody().(map[string]interface{})
+ assert.NoError(t, err)
+ assert.Equal(t, PackageRequest, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, "2.0.2", reassembleBody["dubboVersion"].(string))
+ assert.Equal(t, "path", pkgres.Service.Path)
+ assert.Equal(t, "2.6", pkgres.Service.Version)
+ assert.Equal(t, "Method", pkgres.Service.Method)
+ assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string))
+ assert.Equal(t, []interface{}{"a"}, reassembleBody["args"])
+ assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string))
+}
+
+func TestDubboPackage_Protobuf_Serialization_Request(t *testing.T) {
+ pkg := NewDubboPackage(nil)
+ pkg.Body = []interface{}{"a"}
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
+ pkgres.Body = []interface{}{}
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
+
+ // request
+ pkg.Header.Type = PackageRequest
+ pkg.Service.Interface = "Service"
+ pkg.Service.Path = "path"
+ pkg.Service.Version = "2.6"
+ pkg.Service.Method = "Method"
+ pkg.Service.Timeout = time.Second
+ pkg.SetBody([]interface{}{&pb.StringValue{Value: "hello world"}})
+ data, err = pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ body, ok := pkgres.Body.(map[string]interface{})
+ assert.Equal(t, ok, true)
+ req, ok := body["args"].([]interface{})
+ assert.Equal(t, ok, true)
+ // protobuf rpc just has exact one parameter
+ assert.Equal(t, len(req), 1)
+ argsBytes, ok := req[0].([]byte)
- assert.Equal(t, ok, true)
++ assert.Equal(t, true, ok)
+ sv := pb.StringValue{}
+ buf := proto.NewBuffer(argsBytes)
+ err = buf.Unmarshal(&sv)
+ assert.NoError(t, err)
+ assert.Equal(t, sv.Value, "hello world")
+}
+
+func TestDubboCodec_Protobuf_Serialization_Response(t *testing.T) {
+ {
+ pkg := NewDubboPackage(nil)
+ pkg.Header.Type = PackageResponse
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+ pkg.SetBody(&pb.StringValue{Value: "hello world"})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ pkgres.SetBody(&pb.StringValue{})
+ err = pkgres.Unmarshal()
+
+ assert.NoError(t, err)
+ assert.Equal(t, pkgres.Header.Type, PackageResponse)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+
+ res, ok := pkgres.Body.(*pb.StringValue)
+ assert.Equal(t, ok, true)
+ assert.Equal(t, res.Value, "hello world")
+ }
+
+ // with attachments
+ {
+ attas := make(map[string]string)
+ attas["k1"] = "test"
+ resp := NewResponsePayload(&pb.StringValue{Value: "attachments"}, nil, attas)
+ p := NewDubboPackage(nil)
+ p.Header.Type = PackageResponse
+ p.Header.SerialID = constant.S_Proto
+ p.SetSerializer(ProtoSerializer{})
+ p.SetBody(resp)
+ data, err := p.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.Header.Type = PackageResponse
+ pkgres.Header.SerialID = constant.S_Proto
+ pkgres.Header.ID = 10086
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ resAttachment := make(map[string]string)
+ resBody := &pb.StringValue{}
+ pkgres.SetBody(NewResponsePayload(resBody, nil, resAttachment))
+
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, "attachments", resBody.Value)
+ assert.Equal(t, "test", resAttachment["k1"])
+ }
+
+}
diff --cc protocol/dubbo/impl/const.go
index 06e73fa,0000000..c2f4006
mode 100644,000000..100644
--- a/protocol/dubbo/impl/const.go
+++ b/protocol/dubbo/impl/const.go
@@@ -1,243 -1,0 +1,243 @@@
- package impl
-
- import (
- "reflect"
- "regexp"
-
- "github.com/pkg/errors"
- )
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
++package impl
++
++import (
++ "reflect"
++ "regexp"
++
++ "github.com/pkg/errors"
++)
++
+const (
+ DUBBO = "dubbo"
+)
+
+const (
+ mask = byte(127)
+ flag = byte(128)
+)
+
+const (
+ // Zero : byte zero
+ Zero = byte(0x00)
+)
+
+// constansts
+const (
+ TAG_READ = int32(-1)
+ ASCII_GAP = 32
+ CHUNK_SIZE = 4096
+ BC_BINARY = byte('B') // final chunk
+ BC_BINARY_CHUNK = byte('A') // non-final chunk
+
+ BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary
+ BINARY_DIRECT_MAX = byte(0x0f)
+ BC_BINARY_SHORT = byte(0x34) // 2-byte length binary
+ BINARY_SHORT_MAX = 0x3ff // 0-1023 binary
+
+ BC_DATE = byte(0x4a) // 64-bit millisecond UTC date
+ BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date
+
+ BC_DOUBLE = byte('D') // IEEE 64-bit double
+
+ BC_DOUBLE_ZERO = byte(0x5b)
+ BC_DOUBLE_ONE = byte(0x5c)
+ BC_DOUBLE_BYTE = byte(0x5d)
+ BC_DOUBLE_SHORT = byte(0x5e)
+ BC_DOUBLE_MILL = byte(0x5f)
+
+ BC_FALSE = byte('F') // boolean false
+
+ BC_INT = byte('I') // 32-bit int
+
+ INT_DIRECT_MIN = -0x10
+ INT_DIRECT_MAX = byte(0x2f)
+ BC_INT_ZERO = byte(0x90)
+
+ INT_BYTE_MIN = -0x800
+ INT_BYTE_MAX = 0x7ff
+ BC_INT_BYTE_ZERO = byte(0xc8)
+
+ BC_END = byte('Z')
+
+ INT_SHORT_MIN = -0x40000
+ INT_SHORT_MAX = 0x3ffff
+ BC_INT_SHORT_ZERO = byte(0xd4)
+
+ BC_LIST_VARIABLE = byte(0x55)
+ BC_LIST_FIXED = byte('V')
+ BC_LIST_VARIABLE_UNTYPED = byte(0x57)
+ BC_LIST_FIXED_UNTYPED = byte(0x58)
+ _listFixedTypedLenTagMin = byte(0x70)
+ _listFixedTypedLenTagMax = byte(0x77)
+ _listFixedUntypedLenTagMin = byte(0x78)
+ _listFixedUntypedLenTagMax = byte(0x7f)
+
+ BC_LIST_DIRECT = byte(0x70)
+ BC_LIST_DIRECT_UNTYPED = byte(0x78)
+ LIST_DIRECT_MAX = byte(0x7)
+
+ BC_LONG = byte('L') // 64-bit signed integer
+ LONG_DIRECT_MIN = -0x08
+ LONG_DIRECT_MAX = byte(0x0f)
+ BC_LONG_ZERO = byte(0xe0)
+
+ LONG_BYTE_MIN = -0x800
+ LONG_BYTE_MAX = 0x7ff
+ BC_LONG_BYTE_ZERO = byte(0xf8)
+
+ LONG_SHORT_MIN = -0x40000
+ LONG_SHORT_MAX = 0x3ffff
+ BC_LONG_SHORT_ZERO = byte(0x3c)
+
+ BC_LONG_INT = byte(0x59)
+
+ BC_MAP = byte('M')
+ BC_MAP_UNTYPED = byte('H')
+
+ BC_NULL = byte('N') // x4e
+
+ BC_OBJECT = byte('O')
+ BC_OBJECT_DEF = byte('C')
+
+ BC_OBJECT_DIRECT = byte(0x60)
+ OBJECT_DIRECT_MAX = byte(0x0f)
+
+ BC_REF = byte(0x51)
+
+ BC_STRING = byte('S') // final string
+ BC_STRING_CHUNK = byte('R') // non-final string
+
+ BC_STRING_DIRECT = byte(0x00)
+ STRING_DIRECT_MAX = byte(0x1f)
+ BC_STRING_SHORT = byte(0x30)
+ STRING_SHORT_MAX = 0x3ff
+
+ BC_TRUE = byte('T')
+
+ P_PACKET_CHUNK = byte(0x4f)
+ P_PACKET = byte('P')
+
+ P_PACKET_DIRECT = byte(0x80)
+ PACKET_DIRECT_MAX = byte(0x7f)
+
+ P_PACKET_SHORT = byte(0x70)
+ PACKET_SHORT_MAX = 0xfff
+ ARRAY_STRING = "[string"
+ ARRAY_INT = "[int"
+ ARRAY_DOUBLE = "[double"
+ ARRAY_FLOAT = "[float"
+ ARRAY_BOOL = "[boolean"
+ ARRAY_LONG = "[long"
+
+ PATH_KEY = "path"
+ GROUP_KEY = "group"
+ INTERFACE_KEY = "interface"
+ VERSION_KEY = "version"
+ TIMEOUT_KEY = "timeout"
+
+ STRING_NIL = ""
+ STRING_TRUE = "true"
+ STRING_FALSE = "false"
+ STRING_ZERO = "0.0"
+ STRING_ONE = "1.0"
+)
+
+// ResponsePayload related consts
+const (
+ Response_OK byte = 20
+ Response_CLIENT_TIMEOUT byte = 30
+ Response_SERVER_TIMEOUT byte = 31
+ Response_BAD_REQUEST byte = 40
+ Response_BAD_RESPONSE byte = 50
+ Response_SERVICE_NOT_FOUND byte = 60
+ Response_SERVICE_ERROR byte = 70
+ Response_SERVER_ERROR byte = 80
+ Response_CLIENT_ERROR byte = 90
+
+ // According to "java dubbo" There are two cases of response:
+ // 1. with attachments
+ // 2. no attachments
+ RESPONSE_WITH_EXCEPTION int32 = 0
+ RESPONSE_VALUE int32 = 1
+ RESPONSE_NULL_VALUE int32 = 2
+ RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3
+ RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4
+ RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5
+)
+
+/**
+ * the dubbo protocol header length is 16 Bytes.
+ * the first 2 Bytes is magic code '0xdabb'
+ * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag
+ * the next 1 Bytes is response state.
+ * the next 8 Bytes is package DI.
+ * the next 4 Bytes is package length.
+ **/
+const (
+ // header length.
+ HEADER_LENGTH = 16
+
+ // magic header
+ MAGIC = uint16(0xdabb)
+ MAGIC_HIGH = byte(0xda)
+ MAGIC_LOW = byte(0xbb)
+
+ // message flag.
+ FLAG_REQUEST = byte(0x80)
+ FLAG_TWOWAY = byte(0x40)
+ FLAG_EVENT = byte(0x20) // for heartbeat
+ SERIAL_MASK = 0x1f
+
+ DUBBO_VERSION = "2.5.4"
+ DUBBO_VERSION_KEY = "dubbo"
+ DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
+ LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200
+ DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length
+)
+
+// regular
+const (
+ JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)"
+ CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)"
+ ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))"
+ DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")"
+)
+
+// Dubbo request response related consts
+var (
+ DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY}
+ DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST}
+ DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK}
+ DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT}
+ DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT}
+)
+
+// Error part
+var (
+ ErrHeaderNotEnough = errors.New("header buffer too short")
+ ErrBodyNotEnough = errors.New("body buffer too short")
+ ErrJavaException = errors.New("got java exception")
+ ErrIllegalPackage = errors.New("illegal package!")
+)
+
+// DescRegex ...
+var DescRegex, _ = regexp.Compile(DESC_REGEX)
+
+var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem())
diff --cc protocol/dubbo/impl/proto.go
index ea1c55d,0000000..2622620
mode 100644,000000..100644
--- a/protocol/dubbo/impl/proto.go
+++ b/protocol/dubbo/impl/proto.go
@@@ -1,450 -1,0 +1,454 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
++ "github.com/apache/dubbo-go/common/logger"
+ "io"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/matttproud/golang_protobuf_extensions/pbutil"
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/impl/proto"
+)
+
+type ProtoSerializer struct{}
+
+func (p ProtoSerializer) Marshal(pkg DubboPackage) ([]byte, error) {
+ if pkg.IsHeartBeat() {
+ return []byte{byte('N')}, nil
+ }
+ if pkg.Body == nil {
+ return nil, errors.New("package body should not be nil")
+ }
+ if pkg.IsRequest() {
+ return marshalRequestProto(pkg)
+ }
+ return marshalResponseProto(pkg)
+}
+
+func (p ProtoSerializer) Unmarshal(data []byte, pkg *DubboPackage) error {
+ if pkg.IsRequest() {
+ return unmarshalRequestProto(data, pkg)
+ }
+ return unmarshalResponseProto(data, pkg)
+}
+
+func unmarshalResponseProto(data []byte, pkg *DubboPackage) error {
+ if pkg.Body == nil {
+ pkg.SetBody(NewResponsePayload(nil, nil, nil))
+ }
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(data)
+
+ var responseType int32
+ if err := readByte(buf, &responseType); err != nil {
+ return err
+ }
+
+ hasAttachments := false
+ hasException := false
+ switch responseType {
+ case RESPONSE_VALUE_WITH_ATTACHMENTS:
+ hasAttachments = true
+ case RESPONSE_WITH_EXCEPTION:
+ hasException = true
+ case RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ hasAttachments = true
+ hasException = true
+ }
+ if hasException {
+ throwable := pb.ThrowableProto{}
+ if err := readObject(buf, &throwable); err != nil {
+ return err
+ }
+ // generate error only with error message
+ response.Exception = errors.New(throwable.OriginalMessage)
+ } else {
+ // read response body
+ protoMsg, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return errors.New("response rspobj not protobuf message")
+ }
+ if err := readObject(buf, protoMsg); err != nil {
+ return err
+ }
+ }
+
+ if hasAttachments {
+ atta := pb.Map{}
+ if err := readObject(buf, &atta); err != nil {
+ return err
+ }
+ if response.Attachments == nil {
+ response.Attachments = atta.Attachments
+ } else {
+ for k, v := range atta.Attachments {
+ response.Attachments[k] = v
+ }
+ }
+
+ }
+ return nil
+}
+
+func unmarshalRequestProto(data []byte, pkg *DubboPackage) error {
+ var dubboVersion string
+ var svcPath string
+ var svcVersion string
+ var svcMethod string
+ buf := bytes.NewBuffer(data)
+ if err := readUTF(buf, &dubboVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcPath); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcMethod); err != nil {
+ return err
+ }
+ // NOTE: protobuf rpc just have exact one parameter, while golang doesn't need this field
+ var argsType string
+ if err := readUTF(buf, &argsType); err != nil {
+ return err
+ }
+ // get raw body bytes for proxy methods to unmarshal
+ var protoMsgLength int
+ if err := readDelimitedLength(buf, &protoMsgLength); err != nil {
+ return err
+ }
+ argBytes := make([]byte, protoMsgLength)
+ if n, err := buf.Read(argBytes); err != nil {
+ if n != protoMsgLength {
+ return errors.New("illegal msg length")
+ }
+ return err
+ }
+ arg := getRegisterMessage(argsType)
- err := proto.Unmarshal(argBytes, arg.Interface().(JavaProto))
- if err != nil {
- panic(err)
++ if !arg.IsZero() {
++ err := proto.Unmarshal(argBytes, arg.Interface().(JavaProto))
++ if err != nil {
++ panic(err)
++ }
+ }
+
+ m := &pb.Map{}
+ if err := readObject(buf, m); err != nil {
+ return err
+ }
+ svc := Service{}
+ svc.Version = svcVersion
+ svc.Method = svcMethod
+ // just as hessian
+ svc.Path = svcPath
+ if svc.Path == "" && len(m.Attachments[constant.PATH_KEY]) > 0 {
+ svc.Path = m.Attachments[constant.PATH_KEY]
+ }
+
+ if _, ok := m.Attachments[constant.INTERFACE_KEY]; ok {
+ svc.Interface = m.Attachments[constant.INTERFACE_KEY]
+ } else {
+ svc.Interface = svc.Path
+ }
+ pkg.SetService(svc)
+ pkg.SetBody(map[string]interface{}{
+ "dubboVersion": dubboVersion,
+ "args": []interface{}{arg.Interface()},
+ "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
+ "attachments": m.Attachments,
+ })
+
+ return nil
+}
+
+func marshalRequestProto(pkg DubboPackage) ([]byte, error) {
+ request := EnsureRequestPayload(pkg.Body)
+ args, ok := request.Params.([]interface{})
+ buf := bytes.NewBuffer(make([]byte, 0))
+ if !ok {
+ return nil, errors.New("proto buffer args should be marshaled in []byte")
+ }
+ // NOTE: protobuf rpc just has exact one parameter
+ if len(args) != 1 {
+ return nil, errors.New("illegal protobuf service, len(arg) should equal 1")
+ }
+ // dubbo version
+ if err := writeUTF(buf, DUBBO_VERSION); err != nil {
+ return nil, err
+ }
+ // service path
+ if err := writeUTF(buf, pkg.Service.Path); err != nil {
+ return nil, err
+ }
+ // service version
+ if err := writeUTF(buf, pkg.Service.Version); err != nil {
+ return nil, err
+ }
+ // service method
+ if err := writeUTF(buf, pkg.Service.Method); err != nil {
+ return nil, err
+ }
+ // parameter types desc
+ v := reflect.ValueOf(args[0])
+ mv := v.MethodByName("JavaClassName")
+ if mv.IsValid() {
+ javaCls := mv.Call([]reflect.Value{})
+ if len(javaCls) != 1 {
+ return nil, errors.New("JavaStringName method should return exact 1 result")
+ }
+ javaClsStr, ok := javaCls[0].Interface().(string)
+ if !ok {
+ return nil, errors.New("JavaClassName method should return string")
+ }
+ if err := writeUTF(buf, getJavaArgType(javaClsStr)); err != nil {
+ return nil, err
+ }
+ } else {
+ // defensive code
+ if err := writeUTF(buf, ""); err != nil {
+ return nil, err
+ }
+ }
+ // consumer args
+ protoMsg := args[0].(proto.Message)
+ if err := writeObject(buf, protoMsg); err != nil {
+ return nil, err
+ }
+ // attachments
+ atta := make(map[string]string)
+ atta[PATH_KEY] = pkg.Service.Path
+ atta[VERSION_KEY] = pkg.Service.Version
+ if len(pkg.Service.Group) > 0 {
+ atta[GROUP_KEY] = pkg.Service.Group
+ }
+ if len(pkg.Service.Interface) > 0 {
+ atta[INTERFACE_KEY] = pkg.Service.Interface
+ }
+ if pkg.Service.Timeout != 0 {
+ atta[TIMEOUT_KEY] = strconv.Itoa(int(pkg.Service.Timeout / time.Millisecond))
+ }
+ m := pb.Map{Attachments: atta}
+ if err := writeObject(buf, &m); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func marshalResponseProto(pkg DubboPackage) ([]byte, error) {
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(make([]byte, 0))
+ responseType := RESPONSE_VALUE
+ hasAttachments := false
+ if response.Attachments != nil {
+ responseType = RESPONSE_VALUE_WITH_ATTACHMENTS
+ hasAttachments = true
+ } else {
+ responseType = RESPONSE_VALUE
+ }
+ if response.Exception != nil {
+ if hasAttachments {
+ responseType = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ } else {
+ responseType = RESPONSE_WITH_EXCEPTION
+ }
+ }
+ // write response type
+ if err := writeByte(buf, responseType); err != nil {
+ return nil, err
+ }
+ if response.Exception != nil {
+ // deal with exception
+ throwable := pb.ThrowableProto{OriginalMessage: response.Exception.Error()}
+ if err := writeObject(buf, &throwable); err != nil {
+ return nil, err
+ }
+ } else {
+ res, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return nil, errors.New("proto buffer params should be marshaled in proto.Message")
+ }
+ // response body
+ if err := writeObject(buf, res); err != nil {
+ return nil, err
+ }
+ }
+
+ if hasAttachments {
+ attachments := pb.Map{Attachments: response.Attachments}
+ if err := writeObject(buf, &attachments); err != nil {
+ return nil, err
+ }
+ }
+ return buf.Bytes(), nil
+}
+
+func init() {
+ SetSerializer("protobuf", ProtoSerializer{})
+}
+
+func getJavaArgType(javaClsName string) string {
+ return fmt.Sprintf("L%s;", strings.ReplaceAll(javaClsName, ".", "/"))
+}
+
+func writeUTF(writer io.Writer, value string) error {
+ _, err := pbutil.WriteDelimited(writer, &pb.StringValue{Value: value})
+ return err
+}
+
+func writeObject(writer io.Writer, value proto.Message) error {
+ _, err := pbutil.WriteDelimited(writer, value)
+ return err
+}
+
+func writeByte(writer io.Writer, v int32) error {
+ i32v := &pb.Int32Value{Value: v}
+ _, err := pbutil.WriteDelimited(writer, i32v)
+ return err
+}
+
+func readUTF(reader io.Reader, value *string) error {
+ sv := &pb.StringValue{}
+ _, err := pbutil.ReadDelimited(reader, sv)
+ if err != nil {
+ return err
+ }
+ *value = sv.Value
+ return nil
+}
+
+func readObject(reader io.Reader, value proto.Message) error {
+ _, err := pbutil.ReadDelimited(reader, value)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// just as java protobuf serialize
+func readByte(reader io.Reader, value *int32) error {
+ i32v := &pb.Int32Value{}
+ _, err := pbutil.ReadDelimited(reader, i32v)
+ if err != nil {
+ return err
+ }
+ *value = i32v.Value
+ return nil
+}
+
+//
+func readDelimitedLength(reader io.Reader, length *int) error {
+ var headerBuf [binary.MaxVarintLen32]byte
+ var bytesRead, varIntBytes int
+ var messageLength uint64
+ for varIntBytes == 0 { // i.e. no varint has been decoded yet.
+ if bytesRead >= len(headerBuf) {
+ return errors.New("invalid varint32 encountered")
+ }
+ // We have to read byte by byte here to avoid reading more bytes
+ // than required. Each read byte is appended to what we have
+ // read before.
+ newBytesRead, err := reader.Read(headerBuf[bytesRead : bytesRead+1])
+ if newBytesRead == 0 {
+ if err != nil {
+ return err
+ }
+ // A Reader should not return (0, nil), but if it does,
+ // it should be treated as no-op (according to the
+ // Reader contract). So let's go on...
+ continue
+ }
+ bytesRead += newBytesRead
+ // Now present everything read so far to the varint decoder and
+ // see if a varint can be decoded already.
+ messageLength, varIntBytes = proto.DecodeVarint(headerBuf[:bytesRead])
+ }
+ *length = int(messageLength)
+ return nil
+}
+
+type JavaProto interface {
+ JavaClassName() string
+ proto.Message
+}
+
+type Register struct {
+ sync.RWMutex
+ registry map[string]reflect.Type
+}
+
+var (
+ register = Register{
+ registry: make(map[string]reflect.Type),
+ }
+)
+
+func RegisterMessage(msg JavaProto) {
+ register.Lock()
+ defer register.Unlock()
+
+ name := msg.JavaClassName()
+ name = getJavaArgType(name)
+
+ if e, ok := register.registry[name]; ok {
+ panic(fmt.Sprintf("msg: %v has been registered. existed: %v", msg.JavaClassName(), e))
+ }
+
+ register.registry[name] = typeOfMessage(msg)
+}
+
+func getRegisterMessage(sig string) reflect.Value {
+ register.Lock()
+ defer register.Unlock()
+
+ t, ok := register.registry[sig]
+ if !ok {
- panic(fmt.Sprintf("registry dose not have for svc: %v", sig))
++ logger.Error(fmt.Sprintf("registry dose not have for svc: %v", sig))
++ return NilValue
+ }
+ return reflect.New(t)
+}
+
+func typeOfMessage(o proto.Message) reflect.Type {
+ v := reflect.ValueOf(o)
+ switch v.Kind() {
+ case reflect.Struct:
+ return v.Type()
+ case reflect.Ptr:
+ return v.Elem().Type()
+ }
+
+ return reflect.TypeOf(o)
+}
diff --cc remoting/getty/getty_client_test.go
index 0000000,3dcc6b3..1272654
mode 000000,100644..100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@@ -1,0 -1,497 +1,497 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package getty
+
+ import (
+ "bytes"
+ "context"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+ )
+
+ import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ perrors "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+ )
+
+ import (
+ "github.com/apache/dubbo-go/common"
+ . "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
+ "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/apache/dubbo-go/remoting"
+ )
+
+ func TestRunSuite(t *testing.T) {
+ svr, url := InitTest(t)
+ client := getClient(url)
+ testRequestOneWay(t, svr, url, client)
+ testClient_Call(t, svr, url, client)
+ testClient_AsyncCall(t, svr, url, client)
+
+ svr.Stop()
+ }
+
+ func testRequestOneWay(t *testing.T, svr *Server, url common.URL, client *Client) {
+
+ request := remoting.NewRequest("2.0.2")
+ up := &UserProvider{}
+ invocation := createInvocation("GetUser", nil, nil, []interface{}{[]interface{}{"1", "username"}, up},
+ []reflect.Value{reflect.ValueOf([]interface{}{"1", "username"}), reflect.ValueOf(up)})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = false
+ //user := &User{}
+ err := client.Request(request, 3*time.Second, nil)
+ assert.NoError(t, err)
+ }
+
+ func createInvocation(methodName string, callback interface{}, reply interface{}, arguments []interface{},
+ parameterValues []reflect.Value) *invocation.RPCInvocation {
+ return invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName),
+ invocation.WithArguments(arguments), invocation.WithReply(reply),
+ invocation.WithCallBack(callback), invocation.WithParameterValues(parameterValues))
+ }
+
+ func setAttachment(invocation *invocation.RPCInvocation, attachments map[string]string) {
+ for key, value := range attachments {
+ invocation.SetAttachments(key, value)
+ }
+ }
+
+ func getClient(url common.URL) *Client {
+ client := NewClient(Options{
+ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+ })
+
+ exchangeClient := remoting.NewExchangeClient(url, client, 5*time.Second, false)
+ client.SetExchangeClient(exchangeClient)
+ client.Connect(url)
+ client.SetResponseHandler(exchangeClient)
+ return client
+ }
+
+ func testClient_Call(t *testing.T, svr *Server, url common.URL, c *Client) {
+ c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+
+ testGetBigPkg(t, c)
+ testGetUser(t, c)
+ testGetUser0(t, c)
+ testGetUser1(t, c)
+ testGetUser2(t, c)
+ testGetUser3(t, c)
+ testGetUser4(t, c)
+ testGetUser5(t, c)
+ testGetUser6(t, c)
+ testGetUser61(t, c)
+
+ }
+ func testGetBigPkg(t *testing.T, c *Client) {
+ var (
+ user *User
+ err error
+ )
+
+ user = &User{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetBigPkg", nil, nil, []interface{}{[]interface{}{nil}, user},
+ []reflect.Value{reflect.ValueOf([]interface{}{nil}), reflect.ValueOf(user)})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ pendingResponse.Reply = user
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 8*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.NotEqual(t, "", user.Id)
+ assert.NotEqual(t, "", user.Name)
+ }
+ func testGetUser(t *testing.T, c *Client) {
+ var (
+ user *User
+ err error
+ )
+ user = &User{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser", nil, nil, []interface{}{"1", "username"},
+ []reflect.Value{reflect.ValueOf("1"), reflect.ValueOf("username")})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ pendingResponse.Reply = user
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.Equal(t, User{Id: "1", Name: "username"}, *user)
+ }
+
+ func testGetUser0(t *testing.T, c *Client) {
+ var (
+ user *User
+ err error
+ )
+ user = &User{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser0", nil, nil, []interface{}{"1", nil, "username"},
+ []reflect.Value{reflect.ValueOf("1"), reflect.ValueOf(nil), reflect.ValueOf("username")})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ rsp := remoting.NewPendingResponse(request.ID)
+ rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2"))
+ remoting.AddPendingResponse(rsp)
+ rsp.Reply = user
+ err = c.Request(request, 3*time.Second, rsp)
+ assert.NoError(t, err)
+ assert.Equal(t, User{Id: "1", Name: "username"}, *user)
+ }
+ func testGetUser1(t *testing.T, c *Client) {
+ var (
+ err error
+ )
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser1", nil, nil, []interface{}{},
+ []reflect.Value{})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ user := &User{}
+ pendingResponse.Reply = user
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ }
+ func testGetUser2(t *testing.T, c *Client) {
+ var (
+ err error
+ )
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser2", nil, nil, []interface{}{},
+ []reflect.Value{})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.EqualError(t, err, "error")
+ }
+ func testGetUser3(t *testing.T, c *Client) {
+ var (
+ err error
+ )
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser3", nil, nil, []interface{}{},
+ []reflect.Value{})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ user2 := []interface{}{}
+ pendingResponse.Reply = &user2
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
+ }
+ func testGetUser4(t *testing.T, c *Client) {
+ var (
+ err error
+ )
+ request := remoting.NewRequest("2.0.2")
+ invocation := invocation.NewRPCInvocation("GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil)
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ user2 := []interface{}{}
+ pendingResponse.Reply = &user2
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
+ }
+
+ func testGetUser5(t *testing.T, c *Client) {
+ var (
+ err error
+ )
+ request := remoting.NewRequest("2.0.2")
+ invocation := invocation.NewRPCInvocation("GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil)
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ user3 := map[interface{}]interface{}{}
+ pendingResponse.Reply = &user3
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.NotNil(t, user3)
+ assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"])
+ }
+
+ func testGetUser6(t *testing.T, c *Client) {
+ var (
+ user *User
+ err error
+ )
+ user = &User{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := invocation.NewRPCInvocation("GetUser6", []interface{}{0}, nil)
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ pendingResponse.Reply = user
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.Equal(t, User{Id: "", Name: ""}, *user)
+ }
+
+ func testGetUser61(t *testing.T, c *Client) {
+ var (
+ user *User
+ err error
+ )
+ user = &User{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := invocation.NewRPCInvocation("GetUser6", []interface{}{1}, nil)
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ pendingResponse := remoting.NewPendingResponse(request.ID)
+ pendingResponse.Reply = user
+ remoting.AddPendingResponse(pendingResponse)
+ err = c.Request(request, 3*time.Second, pendingResponse)
+ assert.NoError(t, err)
+ assert.Equal(t, User{Id: "1", Name: ""}, *user)
+ }
+
+ func testClient_AsyncCall(t *testing.T, svr *Server, url common.URL, client *Client) {
+ user := &User{}
+ lock := sync.Mutex{}
+ request := remoting.NewRequest("2.0.2")
+ invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
+ []reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
+ attachment := map[string]string{INTERFACE_KEY: "com.ikurento.user.UserProvider"}
+ setAttachment(invocation, attachment)
+ request.Data = invocation
+ request.Event = false
+ request.TwoWay = true
+ rsp := remoting.NewPendingResponse(request.ID)
+ rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2"))
+ remoting.AddPendingResponse(rsp)
+ rsp.Reply = user
+ rsp.Callback = func(response common.CallbackResponse) {
+ r := response.(remoting.AsyncCallbackResponse)
+ rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
+ assert.Equal(t, User{Id: "4", Name: "username"}, *(rst.Rest.(*User)))
+ lock.Unlock()
+ }
+ lock.Lock()
+ err := client.Request(request, 3*time.Second, rsp)
+ assert.NoError(t, err)
+ assert.Equal(t, User{}, *user)
+ time.Sleep(1 * time.Second)
+ }
+
+ func InitTest(t *testing.T) (*Server, common.URL) {
+
+ hessian.RegisterPOJO(&User{})
+ remoting.RegistryCodec("dubbo", &DubboTestCodec{})
+
- methods, err := common.ServiceMap.Register("dubbo", &UserProvider{})
++ methods, err := common.ServiceMap.Register("", "dubbo", &UserProvider{})
+ assert.NoError(t, err)
+ assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods)
+
+ // config
+ SetClientConf(ClientConfig{
+ ConnectionNum: 2,
+ HeartbeatPeriod: "5s",
+ SessionTimeout: "20s",
+ PoolTTL: 600,
+ PoolSize: 64,
+ GettySessionParam: GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "4s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "client",
+ },
+ })
+ assert.NoError(t, clientConf.CheckValidity())
+ SetServerConfig(ServerConfig{
+ SessionNumber: 700,
+ SessionTimeout: "20s",
+ GettySessionParam: GettySessionParam{
+ CompressEncoding: false,
+ TcpNoDelay: true,
+ TcpKeepAlive: true,
+ KeepAlivePeriod: "120s",
+ TcpRBufSize: 262144,
+ TcpWBufSize: 65536,
+ PkgWQSize: 512,
+ TcpReadTimeout: "1s",
+ TcpWriteTimeout: "5s",
+ WaitTimeout: "1s",
+ MaxMsgLen: 10240000000,
+ SessionName: "server",
+ }})
+ assert.NoError(t, srvConf.CheckValidity())
+
+ url, err := common.NewURL("dubbo://127.0.0.1:20060/UserProvider?anyhost=true&" +
+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=127.0.0.1&methods=GetUser%2C&" +
+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
+ "side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider")
+ // init server
+ userProvider := &UserProvider{}
- common.ServiceMap.Register(url.Protocol, userProvider)
++ common.ServiceMap.Register("", url.Protocol, userProvider)
+ invoker := &proxy_factory.ProxyInvoker{
+ BaseInvoker: *protocol.NewBaseInvoker(url),
+ }
+ handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult {
+ //result := protocol.RPCResult{}
+ r := invoker.Invoke(context.Background(), invocation)
+ result := protocol.RPCResult{
+ Err: r.Error(),
+ Rest: r.Result(),
+ Attrs: r.Attachments(),
+ }
+ return result
+ }
+ server := NewServer(url, handler)
+ server.Start()
+
+ time.Sleep(time.Second * 2)
+
+ return server, url
+ }
+
+ //////////////////////////////////
+ // provider
+ //////////////////////////////////
+
+ type (
+ User struct {
+ Id string `json:"id"`
+ Name string `json:"name"`
+ }
+
+ UserProvider struct {
+ user map[string]User
+ }
+ )
+
+ // size:4801228
+ func (u *UserProvider) GetBigPkg(ctx context.Context, req []interface{}, rsp *User) error {
+ argBuf := new(bytes.Buffer)
+ for i := 0; i < 400; i++ {
+ argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
+ argBuf.WriteString("击鼓其镗,踊跃用兵。土国城漕,我独南行。从孙子仲,平陈与宋。不我以归,忧心有忡。爰居爰处?爰丧其马?于以求之?于林之下。死生契阔,与子成说。执子之手,与子偕老。于嗟阔兮,不我活兮。于嗟洵兮,不我信兮。")
+ }
+ rsp.Id = argBuf.String()
+ rsp.Name = argBuf.String()
+ return nil
+ }
+
+ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error {
+ rsp.Id = req[0].(string)
+ rsp.Name = req[1].(string)
+ return nil
+ }
+
+ func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
+ return User{Id: id, Name: name}, nil
+ }
+
+ func (u *UserProvider) GetUser1() error {
+ return nil
+ }
+
+ func (u *UserProvider) GetUser2() error {
+ return perrors.New("error")
+ }
+
+ func (u *UserProvider) GetUser3(rsp *[]interface{}) error {
+ *rsp = append(*rsp, User{Id: "1", Name: "username"})
+ return nil
+ }
+
+ func (u *UserProvider) GetUser4(ctx context.Context, req []interface{}) ([]interface{}, error) {
+
+ return []interface{}{User{Id: req[0].([]interface{})[0].(string), Name: req[0].([]interface{})[1].(string)}}, nil
+ }
+
+ func (u *UserProvider) GetUser5(ctx context.Context, req []interface{}) (map[interface{}]interface{}, error) {
+ return map[interface{}]interface{}{"key": User{Id: req[0].(map[interface{}]interface{})["id"].(string), Name: req[0].(map[interface{}]interface{})["name"].(string)}}, nil
+ }
+
+ func (u *UserProvider) GetUser6(id int64) (*User, error) {
+ if id == 0 {
+ return nil, nil
+ }
+ return &User{Id: "1"}, nil
+ }
+
+ func (u *UserProvider) Reference() string {
+ return "UserProvider"
+ }
+
+ func (u User) JavaClassName() string {
+ return "com.ikurento.user.User"
+ }