You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ji...@apache.org on 2020/03/21 12:21:09 UTC
[dubbo-go] 01/01: feature: support protobuf
This is an automated email from the ASF dual-hosted git repository.
jianhaixu pushed a commit to branch apache-pb
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit c37b2d284f0cdecc3dadbc0ede346b592eab3192
Author: skyitachi <sk...@gmail.com>
AuthorDate: Thu Dec 19 10:28:10 2019 +0800
feature: support protobuf
---
common/constant/default.go | 1 +
common/constant/key.go | 1 +
common/constant/serializtion.go | 28 +++
common/extension/serialization.go | 58 +++++
common/url.go | 27 ++
config/service_config.go | 2 +
go.mod | 1 +
protocol/dubbo/client.go | 87 ++++---
protocol/dubbo/codec.go | 301 +++++++++++++++++-----
protocol/dubbo/codec_test.go | 166 ++++++++++--
protocol/dubbo/config.go | 4 +
protocol/dubbo/const.go | 238 ++++++++++++++++++
protocol/dubbo/dubbo_invoker.go | 4 +
protocol/dubbo/dubbo_protocol.go | 1 -
protocol/dubbo/hessian.go | 504 +++++++++++++++++++++++++++++++++++++
protocol/dubbo/listener.go | 98 ++++----
protocol/dubbo/package.go | 198 +++++++++++++++
protocol/dubbo/proto.go | 392 +++++++++++++++++++++++++++++
protocol/dubbo/proto/payload.pb.go | 328 ++++++++++++++++++++++++
protocol/dubbo/proto/payload.proto | 78 ++++++
protocol/dubbo/readwriter.go | 137 +++++-----
protocol/dubbo/request.go | 40 +++
protocol/dubbo/response.go | 46 ++++
protocol/dubbo/serialize.go | 6 +
registry/zookeeper/registry.go | 3 +-
25 files changed, 2511 insertions(+), 238 deletions(-)
diff --git a/common/constant/default.go b/common/constant/default.go
index 3c88915..e445306 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -43,6 +43,7 @@ const (
DEFAULT_FAILBACK_TASKS = 100
DEFAULT_REST_CLIENT = "resty"
DEFAULT_REST_SERVER = "go-restful"
+ DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION
)
const (
diff --git a/common/constant/key.go b/common/constant/key.go
index 07335be..8c38ebc 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -75,6 +75,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
+ SERIALIZATION_KEY = "serialization"
)
const (
diff --git a/common/constant/serializtion.go b/common/constant/serializtion.go
new file mode 100644
index 0000000..f27598c
--- /dev/null
+++ b/common/constant/serializtion.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constant
+
+const (
+ S_Hessian2 byte = 2
+ S_Proto byte = 21
+)
+
+const (
+ HESSIAN2_SERIALIZATION = "hessian2"
+ PROTOBUF_SERIALIZATION = "protobuf"
+)
diff --git a/common/extension/serialization.go b/common/extension/serialization.go
new file mode 100644
index 0000000..a0b8b88
--- /dev/null
+++ b/common/extension/serialization.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+)
+
+var (
+ serializers = make(map[string]interface{})
+ nameMaps = make(map[byte]string)
+)
+
+func init() {
+ nameMaps = map[byte]string{
+ constant.S_Hessian2: constant.HESSIAN2_SERIALIZATION,
+ constant.S_Proto: constant.PROTOBUF_SERIALIZATION,
+ }
+}
+
+func SetSerializer(name string, serializer interface{}) {
+ serializers[name] = serializer
+}
+
+func GetSerializer(name string) interface{} {
+ return serializers[name]
+}
+
+func GetSerializerById(id byte) (interface{}, error) {
+ name, ok := nameMaps[id]
+ if !ok {
+ return nil, errors.Errorf("serialId %d not found", id)
+ }
+ serializer, ok := serializers[name]
+ if !ok {
+ return nil, errors.Errorf("serialization %s not found", name)
+ }
+ return serializer, nil
+}
diff --git a/common/url.go b/common/url.go
index ebb648d..3d41dd3 100644
--- a/common/url.go
+++ b/common/url.go
@@ -24,6 +24,7 @@ import (
"math"
"net"
"net/url"
+ "sort"
"strconv"
"strings"
"sync"
@@ -629,3 +630,29 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
}
return methodConfigMergeFcn
}
+
+// doesn't encode url reserve character, url.QueryEscape will do this work
+// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
+func ParamsUnescapeEncode(params url.Values) string {
+ if params == nil {
+ return ""
+ }
+ var buf strings.Builder
+ keys := make([]string, len(params))
+ for k := range params {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ for _, k := range keys {
+ vs := params[k]
+ for _, v := range vs {
+ if buf.Len() > 0 {
+ buf.WriteByte('&')
+ }
+ buf.WriteString(k)
+ buf.WriteByte('=')
+ buf.WriteString(v)
+ }
+ }
+ return buf.String()
+}
diff --git a/config/service_config.go b/config/service_config.go
index 7d97fa4..3131766 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -57,6 +57,7 @@ type ServiceConfig struct {
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
+ Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
@@ -193,6 +194,7 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GROUP_KEY, c.Group)
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
+ urlMap.Set(constant.SERIALIZATION_KEY, srvconfig.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
diff --git a/go.mod b/go.mod
index 83091cf..e9a9f0a 100644
--- a/go.mod
+++ b/go.mod
@@ -36,6 +36,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
+ github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index 5ec7db5..07cac4a 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -25,7 +25,6 @@ import (
)
import (
- hessian "github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
@@ -137,6 +136,7 @@ type Client struct {
sequence atomic.Uint64
pendingResponses *sync.Map
+ codec DubboCodec
}
// NewClient ...
@@ -160,6 +160,7 @@ func NewClient(opt Options) *Client {
opts: opt,
pendingResponses: new(sync.Map),
conf: *clientConf,
+ codec: DubboCodec{},
}
c.sequence.Store(initSequence)
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
@@ -178,6 +179,10 @@ type Request struct {
// NewRequest ...
func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
+ // NOTE: compatible with old versions
+ if svcUrl.GetParam(constant.SERIALIZATION_KEY, "") == "" {
+ svcUrl.SetParam(constant.SERIALIZATION_KEY, constant.DEFAULT_SERIALIZATION)
+ }
return &Request{
addr: addr,
svcUrl: svcUrl,
@@ -225,35 +230,6 @@ func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, resp
}
func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {
-
- p := &DubboPackage{}
- p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
- p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
- p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
- p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
- p.Service.Method = request.method
-
- p.Service.Timeout = c.opts.RequestTimeout
- var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
- if len(timeout) != 0 {
- if t, err := time.ParseDuration(timeout); err == nil {
- p.Service.Timeout = t
- }
- }
-
- p.Header.SerialID = byte(S_Dubbo)
- p.Body = hessian.NewRequest(request.args, request.atta)
-
- var rsp *PendingResponse
- if ct != CT_OneWay {
- p.Header.Type = hessian.PackageRequest_TwoWay
- rsp = NewPendingResponse()
- rsp.response = response
- rsp.callback = callback
- } else {
- p.Header.Type = hessian.PackageRequest
- }
-
var (
err error
session getty.Session
@@ -274,6 +250,37 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
conn.close()
}()
+ var rsp *PendingResponse
+ svc := Service{}
+ header := DubboHeader{}
+ svc.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
+ svc.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
+ svc.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
+ svc.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
+ svc.Method = request.method
+ svc.Timeout = c.opts.RequestTimeout
+ p := NewClientRequestPackage(header, svc)
+
+ serialization := request.svcUrl.GetParam(constant.SERIALIZATION_KEY, c.conf.Serialization)
+ if serialization == constant.HESSIAN2_SERIALIZATION {
+ p.Header.SerialID = constant.S_Hessian2
+ } else if serialization == constant.PROTOBUF_SERIALIZATION {
+ p.Header.SerialID = constant.S_Proto
+ }
+ p.SetBody(NewRequestPayload(request.args, request.atta))
+
+ if err := loadSerializer(p); err != nil {
+ return err
+ }
+
+ if ct != CT_OneWay {
+ p.Header.Type = PackageRequest_TwoWay
+ rsp = NewPendingResponse()
+ rsp.response = response
+ rsp.callback = callback
+ } else {
+ p.Header.Type = PackageRequest
+ }
if err = c.transfer(session, p, rsp); err != nil {
return perrors.WithStack(err)
}
@@ -324,13 +331,21 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
sequence = c.sequence.Add(1)
if pkg == nil {
- pkg = &DubboPackage{}
- pkg.Body = hessian.NewRequest([]interface{}{}, nil)
- pkg.Body = []interface{}{}
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
+ // make heartbeat package
+ header := DubboHeader{
+ Type: PackageHeartbeat,
+ SerialID: constant.S_Hessian2,
+ }
+ pkg = NewClientRequestPackage(header, Service{})
+ // SetBody
+ reqPayload := NewRequestPayload([]interface{}{}, nil)
+ pkg.SetBody(reqPayload)
+ // set serializer
+ if err := loadSerializer(pkg); err != nil {
+ return err
+ }
}
- pkg.Header.ID = int64(sequence)
+ pkg.SetID(int64(sequence))
// cond1
if rsp != nil {
diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go
index 76416b2..85bfbbe 100644
--- a/protocol/dubbo/codec.go
+++ b/protocol/dubbo/codec.go
@@ -19,23 +19,37 @@ package dubbo
import (
"bufio"
- "bytes"
- "fmt"
+ "encoding/binary"
"time"
)
import (
+ "github.com/pkg/errors"
+)
+
+import (
"github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
- perrors "github.com/pkg/errors"
+ "github.com/apache/dubbo-go/common/logger"
)
-//SerialID serial ID
-type SerialID byte
+type DubboCodec struct {
+ reader *bufio.Reader
+ pkgType PackageType
+ bodyLen int
+ serializer Serializer
+ headerRead bool
+}
+// enum part
const (
- // S_Dubbo dubbo serial id
- S_Dubbo SerialID = 2
+ PackageError = PackageType(0x01)
+ PackageRequest = PackageType(0x02)
+ PackageResponse = PackageType(0x04)
+ PackageHeartbeat = PackageType(0x08)
+ PackageRequest_TwoWay = PackageType(0x10)
+ PackageResponse_Exception = PackageType(0x20)
+ PackageType_BitSize = 0x2f
)
//CallType call type
@@ -53,77 +67,252 @@ const (
////////////////////////////////////////////
// dubbo package
////////////////////////////////////////////
-
-// SequenceType ...
type SequenceType int64
-// DubboPackage ...
-type DubboPackage struct {
- Header hessian.DubboHeader
- Service hessian.Service
- Body interface{}
- Err error
-}
+// PackageType ...
+type PackageType int
-func (p DubboPackage) String() string {
- return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
-}
+func (c *DubboCodec) ReadHeader(header *DubboHeader) error {
+ var err error
+ if c.reader.Size() < HEADER_LENGTH {
+ return hessian.ErrHeaderNotEnough
+ }
+ buf, err := c.reader.Peek(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return errors.WithStack(err)
+ }
+ _, err = c.reader.Discard(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return errors.WithStack(err)
+ }
-// Marshal ...
-func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
- codec := hessian.NewHessianCodec(nil)
+ //// read header
+ if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW {
+ return hessian.ErrIllegalPackage
+ }
- pkg, err := codec.Write(p.Service, p.Header, p.Body)
- if err != nil {
- return nil, perrors.WithStack(err)
+ // Header{serialization id(5 bit), event, two way, req/response}
+ if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero {
+ return errors.Errorf("serialization ID:%v", header.SerialID)
+ }
+
+ flag := buf[2] & FLAG_EVENT
+ if flag != Zero {
+ header.Type |= PackageHeartbeat
+ }
+ flag = buf[2] & FLAG_REQUEST
+ if flag != Zero {
+ header.Type |= PackageRequest
+ flag = buf[2] & FLAG_TWOWAY
+ if flag != Zero {
+ header.Type |= PackageRequest_TwoWay
+ }
+ } else {
+ header.Type |= PackageResponse
+ header.ResponseStatus = buf[3]
+ if header.ResponseStatus != Response_OK {
+ header.Type |= PackageResponse_Exception
+ }
+ }
+
+ // Header{req id}
+ header.ID = int64(binary.BigEndian.Uint64(buf[4:]))
+
+ // Header{body len}
+ header.BodyLen = int(binary.BigEndian.Uint32(buf[12:]))
+ if header.BodyLen < 0 {
+ return hessian.ErrIllegalPackage
}
- return bytes.NewBuffer(pkg), nil
+ c.pkgType = header.Type
+ c.bodyLen = header.BodyLen
+
+ if c.reader.Buffered() < c.bodyLen {
+ return hessian.ErrBodyNotEnough
+ }
+ c.headerRead = true
+ return errors.WithStack(err)
+}
+
+func (c *DubboCodec) EncodeHeader(p DubboPackage) []byte {
+ header := p.Header
+ bs := make([]byte, 0)
+ switch header.Type {
+ case PackageHeartbeat:
+ if header.ResponseStatus == Zero {
+ bs = append(bs, hessian.DubboRequestHeartbeatHeader[:]...)
+ } else {
+ bs = append(bs, hessian.DubboResponseHeartbeatHeader[:]...)
+ }
+ case PackageResponse:
+ bs = append(bs, hessian.DubboResponseHeaderBytes[:]...)
+ if header.ResponseStatus != 0 {
+ bs[3] = header.ResponseStatus
+ }
+ case PackageRequest_TwoWay:
+ bs = append(bs, hessian.DubboRequestHeaderBytesTwoWay[:]...)
+ }
+ bs[2] |= header.SerialID & hessian.SERIAL_MASK
+ binary.BigEndian.PutUint64(bs[4:], uint64(header.ID))
+ return bs
}
-// Unmarshal ...
-func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
- // fix issue https://github.com/apache/dubbo-go/issues/380
- bufLen := buf.Len()
- if bufLen < hessian.HEADER_LENGTH {
- return perrors.WithStack(hessian.ErrHeaderNotEnough)
+func (c *DubboCodec) Write(p DubboPackage) ([]byte, error) {
+ // header
+ if c.serializer == nil {
+ return nil, errors.New("serializer should not be nil")
}
+ header := p.Header
+ switch header.Type {
+ case PackageHeartbeat:
+ if header.ResponseStatus == Zero {
+ return packRequest(p, c.serializer)
+ }
+ return packResponse(p, c.serializer)
- codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
+ case PackageRequest, PackageRequest_TwoWay:
+ return packRequest(p, c.serializer)
- // read header
- err := codec.ReadHeader(&p.Header)
- if err != nil {
- return perrors.WithStack(err)
+ case PackageResponse:
+ return packResponse(p, c.serializer)
+
+ default:
+ return nil, errors.Errorf("Unrecognised message type: %v", header.Type)
}
+}
- if len(opts) != 0 { // for client
- client, ok := opts[0].(*Client)
+func (c *DubboCodec) Read(p *DubboPackage) error {
+ if !c.headerRead {
+ if err := c.ReadHeader(&p.Header); err != nil {
+ return err
+ }
+ }
+ body, err := c.reader.Peek(p.GetBodyLen())
+ if err != nil {
+ return err
+ }
+ if p.IsResponseWithException() {
+ logger.Infof("response with exception: %+v", p.Header)
+ decoder := hessian.NewDecoder(body)
+ exception, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ rsp, ok := p.Body.(*ResponsePayload)
if !ok {
- return perrors.Errorf("opts[0] is not of type *Client")
+ return errors.Errorf("java exception:%s", exception.(string))
}
+ rsp.Exception = errors.Errorf("java exception:%s", exception.(string))
+ return nil
+ } else if p.IsHeartBeat() {
+ // heartbeat no need to unmarshal contents
+ return nil
+ }
+ if c.serializer == nil {
+ return errors.New("codec serializer is nil")
+ }
+ return c.serializer.Unmarshal(body, p)
+}
- if p.Header.Type&hessian.PackageRequest != 0x00 {
- // size of this array must be '7'
- // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
- p.Body = make([]interface{}, 7)
- } else {
- pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
- if !ok {
- return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
- }
- p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
+func (c *DubboCodec) SetSerializer(serializer Serializer) {
+ c.serializer = serializer
+}
+
+func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
+ var (
+ byteArray []byte
+ pkgLen int
+ )
+
+ header := p.Header
+
+ //////////////////////////////////////////
+ // byteArray
+ //////////////////////////////////////////
+ // magic
+ switch header.Type {
+ case PackageHeartbeat:
+ byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...)
+ case PackageRequest_TwoWay:
+ byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...)
+ default:
+ byteArray = append(byteArray, DubboRequestHeaderBytes[:]...)
+ }
+
+ // serialization id, two way flag, event, request/response flag
+ // SerialID is id of serialization approach in java dubbo
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ //////////////////////////////////////////
+ // body
+ //////////////////////////////////////////
+ if p.IsHeartBeat() {
+ byteArray = append(byteArray, byte('N'))
+ pkgLen = 1
+ } else {
+ body, err := serializer.Marshal(p)
+ if err != nil {
+ return nil, err
}
+ pkgLen = len(body)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+ }
+ byteArray = append(byteArray, body...)
+ }
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
+ return byteArray, nil
+}
+
+func packResponse(p DubboPackage, serializer Serializer) ([]byte, error) {
+ var (
+ byteArray []byte
+ )
+ header := p.Header
+ hb := p.IsHeartBeat()
+
+ // magic
+ if hb {
+ byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...)
+ } else {
+ byteArray = append(byteArray, DubboResponseHeaderBytes[:]...)
+ }
+ // set serialID, identify serialization types, eg: fastjson->6, hessian2->2
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // response status
+ if header.ResponseStatus != 0 {
+ byteArray[3] = header.ResponseStatus
}
- // read body
- err = codec.ReadBody(p.Body)
- return perrors.WithStack(err)
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ // body
+ body, err := serializer.Marshal(p)
+ if err != nil {
+ return nil, err
+ }
+
+ pkgLen := len(body)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+ }
+ // byteArray{body length}
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
+ byteArray = append(byteArray, body...)
+ return byteArray, nil
}
-////////////////////////////////////////////
-// PendingResponse
-////////////////////////////////////////////
+func NewDubboCodec(reader *bufio.Reader) *DubboCodec {
+ return &DubboCodec{
+ reader: reader,
+ pkgType: 0,
+ bodyLen: 0,
+ headerRead: false,
+ }
+}
// PendingResponse ...
type PendingResponse struct {
diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go
index 5dc71f0..e488df7 100644
--- a/protocol/dubbo/codec_test.go
+++ b/protocol/dubbo/codec_test.go
@@ -18,39 +18,47 @@
package dubbo
import (
- "bytes"
"testing"
"time"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/golang/protobuf/proto"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/proto"
+)
+
func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
- pkg := &DubboPackage{}
+ pkg := NewDubboPackage(nil)
pkg.Body = []interface{}{"a"}
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Hessian2
pkg.Header.ID = 10086
+ pkg.SetSerializer(HessianSerializer{})
// heartbeat
data, err := pkg.Marshal()
assert.NoError(t, err)
- pkgres := &DubboPackage{}
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
pkgres.Body = []interface{}{}
- err = pkgres.Unmarshal(data)
+ err = pkgres.Unmarshal()
assert.NoError(t, err)
- assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type)
- assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
// request
- pkg.Header.Type = hessian.PackageRequest
+ pkg.Header.Type = PackageRequest
pkg.Service.Interface = "Service"
pkg.Service.Path = "path"
pkg.Service.Version = "2.6"
@@ -59,25 +67,139 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
data, err = pkg.Marshal()
assert.NoError(t, err)
- pkgres = &DubboPackage{}
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
pkgres.Body = make([]interface{}, 7)
- err = pkgres.Unmarshal(data)
+ err = pkgres.Unmarshal()
+ reassembleBody := pkgres.GetBody().(map[string]interface{})
+ assert.NoError(t, err)
+ assert.Equal(t, PackageRequest, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, "2.0.2", reassembleBody["dubboVersion"].(string))
+ assert.Equal(t, "path", pkgres.Service.Path)
+ assert.Equal(t, "2.6", pkgres.Service.Version)
+ assert.Equal(t, "Method", pkgres.Service.Method)
+ assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string))
+ assert.Equal(t, []interface{}{"a"}, reassembleBody["args"])
+ assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string))
+}
+
+func TestDubboPackage_Protobuf_Serialization_Request(t *testing.T) {
+ pkg := NewDubboPackage(nil)
+ pkg.Body = []interface{}{"a"}
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+
+ // heartbeat
+ data, err := pkg.Marshal()
assert.NoError(t, err)
- assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type)
- assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
+ pkgres.Body = []interface{}{}
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
- assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0])
- assert.Equal(t, "path", pkgres.Body.([]interface{})[1])
- assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2])
- assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
- assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
- assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
- assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6])
+ assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
+
+ // request
+ pkg.Header.Type = PackageRequest
+ pkg.Service.Interface = "Service"
+ pkg.Service.Path = "path"
+ pkg.Service.Version = "2.6"
+ pkg.Service.Method = "Method"
+ pkg.Service.Timeout = time.Second
+ pkg.SetBody([]interface{}{&pb.StringValue{Value: "hello world"}})
+ data, err = pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ body, ok := pkgres.Body.(map[string]interface{})
+ assert.Equal(t, ok, true)
+ req, ok := body["args"].([]interface{})
+ assert.Equal(t, ok, true)
+ // protobuf rpc just has exact one parameter
+ assert.Equal(t, len(req), 1)
+ argsBytes, ok := req[0].([]byte)
+ assert.Equal(t, ok, true)
+ sv := pb.StringValue{}
+ buf := proto.NewBuffer(argsBytes)
+ err = buf.Unmarshal(&sv)
+ assert.NoError(t, err)
+ assert.Equal(t, sv.Value, "hello world")
+}
+
+func TestDubboCodec_Protobuf_Serialization_Response(t *testing.T) {
+ {
+ pkg := NewDubboPackage(nil)
+ pkg.Header.Type = PackageResponse
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+ pkg.SetBody(&pb.StringValue{Value: "hello world"})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ pkgres.SetBody(&pb.StringValue{})
+ err = pkgres.Unmarshal()
+
+ assert.NoError(t, err)
+ assert.Equal(t, pkgres.Header.Type, PackageResponse)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+
+ res, ok := pkgres.Body.(*pb.StringValue)
+ assert.Equal(t, ok, true)
+ assert.Equal(t, res.Value, "hello world")
+ }
+
+ // with attachments
+ {
+ attas := make(map[string]string)
+ attas["k1"] = "test"
+ resp := NewResponsePayload(&pb.StringValue{Value: "attachments"}, nil, attas)
+ p := NewDubboPackage(nil)
+ p.Header.Type = PackageResponse
+ p.Header.SerialID = constant.S_Proto
+ p.SetSerializer(ProtoSerializer{})
+ p.SetBody(resp)
+ data, err := p.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.Header.Type = PackageResponse
+ pkgres.Header.SerialID = constant.S_Proto
+ pkgres.Header.ID = 10086
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ resAttachment := make(map[string]string)
+ resBody := &pb.StringValue{}
+ pkgres.SetBody(NewResponsePayload(resBody, nil, resAttachment))
+
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, "attachments", resBody.Value)
+ assert.Equal(t, "test", resAttachment["k1"])
+ }
+
}
func TestIssue380(t *testing.T) {
pkg := &DubboPackage{}
- buf := bytes.NewBuffer([]byte("hello"))
- err := pkg.Unmarshal(buf)
+ err := pkg.Unmarshal()
assert.True(t, perrors.Cause(err) == hessian.ErrHeaderNotEnough)
}
diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go
index dbc6989..a2f3e70 100644
--- a/protocol/dubbo/config.go
+++ b/protocol/dubbo/config.go
@@ -91,6 +91,9 @@ type (
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
+
+ // serialization
+ Serialization string `default:"hessian2" yaml:"serialization" json:"serialization"`
}
)
@@ -106,6 +109,7 @@ func GetDefaultClientConfig() ClientConfig {
GrPoolSize: 200,
QueueLen: 64,
QueueNumber: 10,
+ Serialization: "hessian2",
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
diff --git a/protocol/dubbo/const.go b/protocol/dubbo/const.go
new file mode 100644
index 0000000..936b8d8
--- /dev/null
+++ b/protocol/dubbo/const.go
@@ -0,0 +1,238 @@
+package dubbo
+
+import (
+ "github.com/pkg/errors"
+ "reflect"
+ "regexp"
+)
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const (
+ mask = byte(127)
+ flag = byte(128)
+)
+
+const (
+ // Zero : byte zero
+ Zero = byte(0x00)
+)
+
+// constansts
+const (
+ TAG_READ = int32(-1)
+ ASCII_GAP = 32
+ CHUNK_SIZE = 4096
+ BC_BINARY = byte('B') // final chunk
+ BC_BINARY_CHUNK = byte('A') // non-final chunk
+
+ BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary
+ BINARY_DIRECT_MAX = byte(0x0f)
+ BC_BINARY_SHORT = byte(0x34) // 2-byte length binary
+ BINARY_SHORT_MAX = 0x3ff // 0-1023 binary
+
+ BC_DATE = byte(0x4a) // 64-bit millisecond UTC date
+ BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date
+
+ BC_DOUBLE = byte('D') // IEEE 64-bit double
+
+ BC_DOUBLE_ZERO = byte(0x5b)
+ BC_DOUBLE_ONE = byte(0x5c)
+ BC_DOUBLE_BYTE = byte(0x5d)
+ BC_DOUBLE_SHORT = byte(0x5e)
+ BC_DOUBLE_MILL = byte(0x5f)
+
+ BC_FALSE = byte('F') // boolean false
+
+ BC_INT = byte('I') // 32-bit int
+
+ INT_DIRECT_MIN = -0x10
+ INT_DIRECT_MAX = byte(0x2f)
+ BC_INT_ZERO = byte(0x90)
+
+ INT_BYTE_MIN = -0x800
+ INT_BYTE_MAX = 0x7ff
+ BC_INT_BYTE_ZERO = byte(0xc8)
+
+ BC_END = byte('Z')
+
+ INT_SHORT_MIN = -0x40000
+ INT_SHORT_MAX = 0x3ffff
+ BC_INT_SHORT_ZERO = byte(0xd4)
+
+ BC_LIST_VARIABLE = byte(0x55)
+ BC_LIST_FIXED = byte('V')
+ BC_LIST_VARIABLE_UNTYPED = byte(0x57)
+ BC_LIST_FIXED_UNTYPED = byte(0x58)
+ _listFixedTypedLenTagMin = byte(0x70)
+ _listFixedTypedLenTagMax = byte(0x77)
+ _listFixedUntypedLenTagMin = byte(0x78)
+ _listFixedUntypedLenTagMax = byte(0x7f)
+
+ BC_LIST_DIRECT = byte(0x70)
+ BC_LIST_DIRECT_UNTYPED = byte(0x78)
+ LIST_DIRECT_MAX = byte(0x7)
+
+ BC_LONG = byte('L') // 64-bit signed integer
+ LONG_DIRECT_MIN = -0x08
+ LONG_DIRECT_MAX = byte(0x0f)
+ BC_LONG_ZERO = byte(0xe0)
+
+ LONG_BYTE_MIN = -0x800
+ LONG_BYTE_MAX = 0x7ff
+ BC_LONG_BYTE_ZERO = byte(0xf8)
+
+ LONG_SHORT_MIN = -0x40000
+ LONG_SHORT_MAX = 0x3ffff
+ BC_LONG_SHORT_ZERO = byte(0x3c)
+
+ BC_LONG_INT = byte(0x59)
+
+ BC_MAP = byte('M')
+ BC_MAP_UNTYPED = byte('H')
+
+ BC_NULL = byte('N') // x4e
+
+ BC_OBJECT = byte('O')
+ BC_OBJECT_DEF = byte('C')
+
+ BC_OBJECT_DIRECT = byte(0x60)
+ OBJECT_DIRECT_MAX = byte(0x0f)
+
+ BC_REF = byte(0x51)
+
+ BC_STRING = byte('S') // final string
+ BC_STRING_CHUNK = byte('R') // non-final string
+
+ BC_STRING_DIRECT = byte(0x00)
+ STRING_DIRECT_MAX = byte(0x1f)
+ BC_STRING_SHORT = byte(0x30)
+ STRING_SHORT_MAX = 0x3ff
+
+ BC_TRUE = byte('T')
+
+ P_PACKET_CHUNK = byte(0x4f)
+ P_PACKET = byte('P')
+
+ P_PACKET_DIRECT = byte(0x80)
+ PACKET_DIRECT_MAX = byte(0x7f)
+
+ P_PACKET_SHORT = byte(0x70)
+ PACKET_SHORT_MAX = 0xfff
+ ARRAY_STRING = "[string"
+ ARRAY_INT = "[int"
+ ARRAY_DOUBLE = "[double"
+ ARRAY_FLOAT = "[float"
+ ARRAY_BOOL = "[boolean"
+ ARRAY_LONG = "[long"
+
+ PATH_KEY = "path"
+ GROUP_KEY = "group"
+ INTERFACE_KEY = "interface"
+ VERSION_KEY = "version"
+ TIMEOUT_KEY = "timeout"
+
+ STRING_NIL = ""
+ STRING_TRUE = "true"
+ STRING_FALSE = "false"
+ STRING_ZERO = "0.0"
+ STRING_ONE = "1.0"
+)
+
+// ResponsePayload related consts
+const (
+ Response_OK byte = 20
+ Response_CLIENT_TIMEOUT byte = 30
+ Response_SERVER_TIMEOUT byte = 31
+ Response_BAD_REQUEST byte = 40
+ Response_BAD_RESPONSE byte = 50
+ Response_SERVICE_NOT_FOUND byte = 60
+ Response_SERVICE_ERROR byte = 70
+ Response_SERVER_ERROR byte = 80
+ Response_CLIENT_ERROR byte = 90
+
+ // According to "java dubbo" There are two cases of response:
+ // 1. with attachments
+ // 2. no attachments
+ RESPONSE_WITH_EXCEPTION int32 = 0
+ RESPONSE_VALUE int32 = 1
+ RESPONSE_NULL_VALUE int32 = 2
+ RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3
+ RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4
+ RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5
+)
+
+/**
+ * the dubbo protocol header length is 16 Bytes.
+ * the first 2 Bytes is magic code '0xdabb'
+ * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag
+ * the next 1 Bytes is response state.
+ * the next 8 Bytes is package DI.
+ * the next 4 Bytes is package length.
+ **/
+const (
+ // header length.
+ HEADER_LENGTH = 16
+
+ // magic header
+ MAGIC = uint16(0xdabb)
+ MAGIC_HIGH = byte(0xda)
+ MAGIC_LOW = byte(0xbb)
+
+ // message flag.
+ FLAG_REQUEST = byte(0x80)
+ FLAG_TWOWAY = byte(0x40)
+ FLAG_EVENT = byte(0x20) // for heartbeat
+ SERIAL_MASK = 0x1f
+
+ DUBBO_VERSION = "2.5.4"
+ DUBBO_VERSION_KEY = "dubbo"
+ DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
+ LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200
+ DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length
+)
+
+// regular
+const (
+ JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)"
+ CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)"
+ ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))"
+ DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")"
+)
+
+// Dubbo request response related consts
+var (
+ DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY}
+ DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST}
+ DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK}
+ DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT}
+ DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT}
+)
+
+// Error part
+var (
+ ErrHeaderNotEnough = errors.New("header buffer too short")
+ ErrBodyNotEnough = errors.New("body buffer too short")
+ ErrJavaException = errors.New("got java exception")
+ ErrIllegalPackage = errors.New("illegal package!")
+)
+
+// DescRegex ...
+var DescRegex, _ = regexp.Compile(DESC_REGEX)
+
+var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem())
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 09c3725..392e9af 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -93,6 +93,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
di.appendCtx(ctx, inv)
url := di.GetUrl()
+ // default hessian2 serialization, compatible
+ if url.GetParam("serialization", "") == "" {
+ url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)
+ }
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
if err != nil {
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 355dbc8..0814060 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -67,7 +67,6 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
dp.SetExporterMap(serviceKey, exporter)
logger.Infof("Export service: %s", url.String())
-
// start server
dp.openServer(url)
return exporter
diff --git a/protocol/dubbo/hessian.go b/protocol/dubbo/hessian.go
new file mode 100644
index 0000000..713da84
--- /dev/null
+++ b/protocol/dubbo/hessian.go
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "math"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/apache/dubbo-go-hessian2/java_exception"
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/logger"
+)
+
+type Object interface{}
+
+func getArgType(v interface{}) string {
+ if v == nil {
+ return "V"
+ }
+
+ switch v.(type) {
+ // Serialized tags for base types
+ case nil:
+ return "V"
+ case bool:
+ return "Z"
+ case []bool:
+ return "[Z"
+ case byte:
+ return "B"
+ case []byte:
+ return "[B"
+ case int8:
+ return "B"
+ case []int8:
+ return "[B"
+ case int16:
+ return "S"
+ case []int16:
+ return "[S"
+ case uint16: // Equivalent to Char of Java
+ return "C"
+ case []uint16:
+ return "[C"
+ // case rune:
+ // return "C"
+ case int:
+ return "J"
+ case []int:
+ return "[J"
+ case int32:
+ return "I"
+ case []int32:
+ return "[I"
+ case int64:
+ return "J"
+ case []int64:
+ return "[J"
+ case time.Time:
+ return "java.util.Date"
+ case []time.Time:
+ return "[Ljava.util.Date"
+ case float32:
+ return "F"
+ case []float32:
+ return "[F"
+ case float64:
+ return "D"
+ case []float64:
+ return "[D"
+ case string:
+ return "java.lang.String"
+ case []string:
+ return "[Ljava.lang.String;"
+ case []Object:
+ return "[Ljava.lang.Object;"
+ case map[interface{}]interface{}:
+ // return "java.util.HashMap"
+ return "java.util.Map"
+
+ // Serialized tags for complex types
+ default:
+ t := reflect.TypeOf(v)
+ if reflect.Ptr == t.Kind() {
+ t = reflect.TypeOf(reflect.ValueOf(v).Elem())
+ }
+ switch t.Kind() {
+ case reflect.Struct:
+ return "java.lang.Object"
+ case reflect.Slice, reflect.Array:
+ if t.Elem().Kind() == reflect.Struct {
+ return "[Ljava.lang.Object;"
+ }
+ // return "java.util.ArrayList"
+ return "java.util.List"
+ case reflect.Map: // Enter here, map may be map[string]int
+ return "java.util.Map"
+ default:
+ return ""
+ }
+ }
+}
+
+func getArgsTypeList(args []interface{}) (string, error) {
+ var (
+ typ string
+ types string
+ )
+ for i := range args {
+ typ = getArgType(args[i])
+ if typ == "" {
+ return types, errors.Errorf("cat not get arg %#v type", args[i])
+ }
+ if !strings.Contains(typ, ".") {
+ types += typ
+ } else if strings.Index(typ, "[") == 0 {
+ types += strings.Replace(typ, ".", "/", -1)
+ } else {
+ // java.util.List -> Ljava/util/List;
+ types += "L" + strings.Replace(typ, ".", "/", -1) + ";"
+ }
+ }
+
+ return types, nil
+}
+
+type HessianSerializer struct {
+}
+
+func (h HessianSerializer) Marshal(p DubboPackage) ([]byte, error) {
+ encoder := hessian.NewEncoder()
+ if p.IsRequest() {
+ return marshalRequest(encoder, p)
+ }
+ return marshalResponse(encoder, p)
+}
+
+func (h HessianSerializer) Unmarshal(input []byte, p *DubboPackage) error {
+ if p.IsHeartBeat() {
+ return nil
+ }
+ if p.IsRequest() {
+ return unmarshalRequestBody(input, p)
+ }
+ return unmarshalResponseBody(input, p)
+}
+
+func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
+ header := p.Header
+ response := EnsureResponsePayload(p.Body)
+ if header.ResponseStatus == Response_OK {
+ if p.IsHeartBeat() {
+ encoder.Encode(nil)
+ } else {
+ atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY])
+
+ var resWithException, resValue, resNullValue int32
+ if atta {
+ resWithException = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ resValue = RESPONSE_VALUE_WITH_ATTACHMENTS
+ resNullValue = RESPONSE_NULL_VALUE_WITH_ATTACHMENTS
+ } else {
+ resWithException = RESPONSE_WITH_EXCEPTION
+ resValue = RESPONSE_VALUE
+ resNullValue = RESPONSE_NULL_VALUE
+ }
+
+ if response.Exception != nil { // throw error
+ encoder.Encode(resWithException)
+ if t, ok := response.Exception.(java_exception.Throwabler); ok {
+ encoder.Encode(t)
+ } else {
+ encoder.Encode(java_exception.NewThrowable(response.Exception.Error()))
+ }
+ } else {
+ if response.RspObj == nil {
+ encoder.Encode(resNullValue)
+ } else {
+ encoder.Encode(resValue)
+ encoder.Encode(response.RspObj) // result
+ }
+ }
+
+ if atta {
+ encoder.Encode(response.Attachments) // attachments
+ }
+ }
+ } else {
+ if response.Exception != nil { // throw error
+ encoder.Encode(response.Exception.Error())
+ } else {
+ encoder.Encode(response.RspObj)
+ }
+ }
+ bs := encoder.Buffer()
+ // encNull
+ bs = append(bs, byte('N'))
+ return bs, nil
+}
+
+func marshalRequest(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
+ service := p.Service
+ request := EnsureRequestPayload(p.Body)
+ encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION)
+ encoder.Encode(service.Path)
+ encoder.Encode(service.Version)
+ encoder.Encode(service.Method)
+
+ args, ok := request.Params.([]interface{})
+
+ if !ok {
+ logger.Infof("request args are: %+v", request.Params)
+ return nil, errors.Errorf("@params is not of type: []interface{}")
+ }
+ types, err := getArgsTypeList(args)
+ if err != nil {
+ return nil, errors.Wrapf(err, " PackRequest(args:%+v)", args)
+ }
+ encoder.Encode(types)
+ for _, v := range args {
+ encoder.Encode(v)
+ }
+
+ request.Attachments[PATH_KEY] = service.Path
+ request.Attachments[VERSION_KEY] = service.Version
+ if len(service.Group) > 0 {
+ request.Attachments[GROUP_KEY] = service.Group
+ }
+ if len(service.Interface) > 0 {
+ request.Attachments[INTERFACE_KEY] = service.Interface
+ }
+ if service.Timeout != 0 {
+ request.Attachments[TIMEOUT_KEY] = strconv.Itoa(int(service.Timeout / time.Millisecond))
+ }
+
+ encoder.Encode(request.Attachments)
+ return encoder.Buffer(), nil
+
+}
+
+var versionInt = make(map[string]int)
+
+// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96
+// isSupportResponseAttachment is for compatibility among some dubbo version
+func isSupportResponseAttachment(version string) bool {
+ if version == "" {
+ return false
+ }
+
+ v, ok := versionInt[version]
+ if !ok {
+ v = version2Int(version)
+ if v == -1 {
+ return false
+ }
+ }
+
+ if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2
+ return false
+ }
+ return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT
+}
+
+func version2Int(version string) int {
+ var v = 0
+ varr := strings.Split(version, ".")
+ length := len(varr)
+ for key, value := range varr {
+ v0, err := strconv.Atoi(value)
+ if err != nil {
+ return -1
+ }
+ v += v0 * int(math.Pow10((length-key-1)*2))
+ }
+ if length == 3 {
+ return v * 100
+ }
+ return v
+}
+
+func unmarshalRequestBody(body []byte, p *DubboPackage) error {
+ if p.Body == nil {
+ p.SetBody(make([]interface{}, 7))
+ }
+ decoder := hessian.NewDecoder(body)
+ var (
+ err error
+ dubboVersion, target, serviceVersion, method, argsTypes interface{}
+ args []interface{}
+ )
+ req, ok := p.Body.([]interface{})
+ if !ok {
+ return errors.Errorf("@reqObj is not of type: []interface{}")
+ }
+ dubboVersion, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ req[0] = dubboVersion
+
+ target, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ req[1] = target
+
+ serviceVersion, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ req[2] = serviceVersion
+
+ method, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ req[3] = method
+
+ argsTypes, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ req[4] = argsTypes
+
+ ats := hessian.DescRegex.FindAllString(argsTypes.(string), -1)
+ var arg interface{}
+ for i := 0; i < len(ats); i++ {
+ arg, err = decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ args = append(args, arg)
+ }
+ req[5] = args
+
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ v[DUBBO_VERSION_KEY] = dubboVersion
+ req[6] = hessian.ToMapStringString(v)
+ buildServerSidePackageBody(p)
+ return nil
+ }
+ return errors.Errorf("get wrong attachments: %+v", attachments)
+}
+
+func unmarshalResponseBody(body []byte, p *DubboPackage) error {
+ decoder := hessian.NewDecoder(body)
+ rspType, err := decoder.Decode()
+ if p.Body == nil {
+ p.SetBody(&ResponsePayload{})
+ }
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ response := EnsureResponsePayload(p.Body)
+
+ switch rspType {
+ case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ expt, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return errors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+
+ if e, ok := expt.(error); ok {
+ response.Exception = e
+ } else {
+ response.Exception = errors.Errorf("got exception: %+v", expt)
+ }
+ return nil
+
+ case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS:
+ rsp, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return errors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+
+ return errors.WithStack(hessian.ReflectResponse(rsp, response.RspObj))
+
+ case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
+ if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return errors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+ return nil
+ }
+ return nil
+}
+
+func buildServerSidePackageBody(pkg *DubboPackage) {
+ req := pkg.GetBody().([]interface{}) // length of body should be 7
+ if len(req) > 0 {
+ var dubboVersion, argsTypes string
+ var args []interface{}
+ var attachments map[string]string
+ svc := Service{}
+ if req[0] != nil {
+ dubboVersion = req[0].(string)
+ }
+ if req[1] != nil {
+ svc.Path = req[1].(string)
+ }
+ if req[2] != nil {
+ svc.Version = req[2].(string)
+ }
+ if req[3] != nil {
+ svc.Method = req[3].(string)
+ }
+ if req[4] != nil {
+ argsTypes = req[4].(string)
+ }
+ if req[5] != nil {
+ args = req[5].([]interface{})
+ }
+ if req[6] != nil {
+ attachments = req[6].(map[string]string)
+ }
+ if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
+ svc.Path = attachments[constant.PATH_KEY]
+ }
+ if _, ok := attachments[constant.INTERFACE_KEY]; ok {
+ svc.Interface = attachments[constant.INTERFACE_KEY]
+ } else {
+ svc.Interface = svc.Path
+ }
+ if len(attachments[constant.GROUP_KEY]) > 0 {
+ svc.Group = attachments[constant.GROUP_KEY]
+ }
+ pkg.SetService(svc)
+ pkg.SetBody(map[string]interface{}{
+ "dubboVersion": dubboVersion,
+ "argsTypes": argsTypes,
+ "args": args,
+ "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
+ "attachments": attachments,
+ })
+ }
+}
+
+func init() {
+ extension.SetSerializer("hessian2", HessianSerializer{})
+}
diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go
index 0251b78..31eaed6 100644
--- a/protocol/dubbo/listener.go
+++ b/protocol/dubbo/listener.go
@@ -104,17 +104,10 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
- if p.Header.Type&hessian.PackageResponse != 0x00 {
- logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
- if p.Err != nil {
- logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
- }
- h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
- } else {
- logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
- p.Header.ResponseStatus = hessian.Response_OK
- reply(session, p, hessian.PackageHeartbeat)
+ if p.Header.Type&PackageHeartbeat != 0x00 {
+ logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
+ if p.Err != nil {
+ logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
}
return
}
@@ -137,6 +130,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
+ logger.Info("proxy service callback")
pendingResponse.callback(pendingResponse.GetCallResponse())
}
}
@@ -228,82 +222,84 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
logger.Errorf("illegal package{%#v}", pkg)
return
}
- p.Header.ResponseStatus = hessian.Response_OK
+ p.SetResponseStatus(hessian.Response_OK)
+ //p.Header.ResponseStatus = hessian.Response_OK
// heartbeat
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
- logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
- reply(session, p, hessian.PackageHeartbeat)
+ if p.GetHeader().Type&PackageHeartbeat != 0x00 {
+ logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.GetHeader(), p.GetService(), p.GetBody())
+ h.reply(session, p, PackageHeartbeat)
return
}
twoway := true
// not twoway
- if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 {
+ if p.GetHeader().Type&PackageRequest_TwoWay == 0x00 {
twoway = false
}
defer func() {
if e := recover(); e != nil {
- p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
+ p.SetResponseStatus(hessian.Response_SERVER_ERROR)
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
- p.Body = perrors.WithStack(err)
+ p.SetBody(perrors.WithStack(err))
} else if err, ok := e.(string); ok {
logger.Errorf("OnMessage panic: %+v", perrors.New(err))
- p.Body = perrors.New(err)
+ p.SetBody(perrors.New(err))
} else {
logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
- p.Body = e
+ p.SetBody(e)
}
if !twoway {
return
}
- reply(session, p, hessian.PackageResponse)
+ h.reply(session, p, PackageResponse)
}
}()
- u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}),
- common.WithParamsValue(constant.GROUP_KEY, p.Service.Group),
- common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface),
- common.WithParamsValue(constant.VERSION_KEY, p.Service.Version))
+ u := common.NewURLWithOptions(common.WithPath(p.GetService().Path), common.WithParams(url.Values{}),
+ common.WithParamsValue(constant.GROUP_KEY, p.GetService().Group),
+ common.WithParamsValue(constant.INTERFACE_KEY, p.GetService().Interface),
+ common.WithParamsValue(constant.VERSION_KEY, p.GetService().Version))
exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey())
if exporter == nil {
err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey())
logger.Errorf(err.Error())
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = err
- reply(session, p, hessian.PackageResponse)
+ p.SetResponseStatus(Response_OK)
+ p.SetBody(err)
+ h.reply(session, p, PackageResponse)
return
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
- attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string)
+ attachments := p.GetBody().(map[string]interface{})["attachments"].(map[string]string)
attachments[constant.LOCAL_ADDR] = session.LocalAddr()
attachments[constant.REMOTE_ADDR] = session.RemoteAddr()
- args := p.Body.(map[string]interface{})["args"].([]interface{})
- inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
+ args := p.GetBody().(map[string]interface{})["args"].([]interface{})
+ inv := invocation.NewRPCInvocation(p.GetService().Method, args, attachments)
ctx := rebuildCtx(inv)
-
result := invoker.Invoke(ctx, inv)
+ logger.Debugf("invoker result: %+v", result)
if err := result.Error(); err != nil {
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(nil, err, result.Attachments())
+ p.SetResponseStatus(Response_OK)
+ p.SetBody(&ResponsePayload{nil, err, result.Attachments()})
} else {
res := result.Result()
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(res, nil, result.Attachments())
+ p.SetResponseStatus(Response_OK)
+ p.SetBody(&ResponsePayload{res, nil, result.Attachments()})
+ //logger.Debugf("service return response %v", res)
}
}
if !twoway {
return
}
- reply(session, p, hessian.PackageResponse)
+ h.reply(session, p, PackageResponse)
}
// OnCron ...
@@ -347,23 +343,25 @@ func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
return ctx
}
-func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
- resp := &DubboPackage{
- Header: hessian.DubboHeader{
- SerialID: req.Header.SerialID,
- Type: tp,
- ID: req.Header.ID,
- ResponseStatus: req.Header.ResponseStatus,
- },
+func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp PackageType) {
+ header := DubboHeader{
+ SerialID: req.GetHeader().SerialID,
+ Type: tp,
+ ID: req.GetHeader().ID,
+ BodyLen: 0,
+ ResponseStatus: req.GetHeader().ResponseStatus,
+ }
+ resp := NewServerResponsePackage(header)
+ if err := loadSerializer(resp); err != nil {
+ logger.Errorf("reply error %v", err)
+ return
}
- if req.Header.Type&hessian.PackageRequest != 0x00 {
- resp.Body = req.Body
- } else {
- resp.Body = nil
+ if req.GetHeader().Type&PackageRequest != 0x00 {
+ resp.SetBody(req.GetBody())
}
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
- logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header)
+ logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.GetHeader())
}
}
diff --git a/protocol/dubbo/package.go b/protocol/dubbo/package.go
new file mode 100644
index 0000000..15d3b07
--- /dev/null
+++ b/protocol/dubbo/package.go
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "time"
+)
+
+import (
+ "github.com/pkg/errors"
+)
+
+type DubboHeader struct {
+ SerialID byte
+ Type PackageType
+ ID int64
+ BodyLen int
+ ResponseStatus byte
+}
+
+// Service defines service instance
+type Service struct {
+ Path string
+ Interface string
+ Group string
+ Version string
+ Method string
+ Timeout time.Duration // request timeout
+}
+
+type DubboPackage struct {
+ Header DubboHeader
+ Service Service
+ Body interface{}
+ Err error
+ codec *DubboCodec
+}
+
+func (p DubboPackage) String() string {
+ return fmt.Sprintf("HessianPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
+}
+
+func (p *DubboPackage) ReadHeader() error {
+ return p.codec.ReadHeader(&p.Header)
+}
+
+func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
+ if p.codec == nil {
+ return nil, errors.New("codec is nil")
+ }
+ pkg, err := p.codec.Write(*p)
+ if err != nil {
+ return nil, errors.WithStack(err)
+ }
+ return bytes.NewBuffer(pkg), nil
+}
+
+func (p *DubboPackage) Unmarshal() error {
+ return p.codec.Read(p)
+}
+
+func (p DubboPackage) IsHeartBeat() bool {
+ return p.Header.Type&PackageHeartbeat != 0
+}
+
+func (p DubboPackage) IsRequest() bool {
+ return p.Header.Type&(PackageRequest_TwoWay|PackageRequest) != 0
+}
+
+func (p DubboPackage) IsResponse() bool {
+ return p.Header.Type == PackageResponse
+}
+
+func (p DubboPackage) IsResponseWithException() bool {
+ flag := PackageResponse | PackageResponse_Exception
+ return p.Header.Type&flag == flag
+}
+
+func (p DubboPackage) GetBodyLen() int {
+ return p.Header.BodyLen
+}
+
+func (p DubboPackage) GetLen() int {
+ return HEADER_LENGTH + p.Header.BodyLen
+}
+
+func (p DubboPackage) GetBody() interface{} {
+ return p.Body
+}
+
+func (p *DubboPackage) SetBody(body interface{}) {
+ p.Body = body
+}
+
+func (p *DubboPackage) SetHeader(header DubboHeader) {
+ p.Header = header
+}
+
+func (p *DubboPackage) SetService(svc Service) {
+ p.Service = svc
+}
+
+func (p *DubboPackage) SetID(id int64) {
+ p.Header.ID = id
+}
+
+func (p DubboPackage) GetHeader() DubboHeader {
+ return p.Header
+}
+
+func (p DubboPackage) GetService() Service {
+ return p.Service
+}
+
+func (p *DubboPackage) SetResponseStatus(status byte) {
+ p.Header.ResponseStatus = status
+}
+
+func (p *DubboPackage) SetSerializer(serializer Serializer) {
+ p.codec.SetSerializer(serializer)
+}
+
+func NewClientResponsePackage(data []byte) *DubboPackage {
+ return &DubboPackage{
+ Header: DubboHeader{},
+ Service: Service{},
+ Body: &ResponsePayload{},
+ Err: nil,
+ codec: NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))),
+ }
+}
+
+// server side receive request package, just for deserialization
+func NewServerRequestPackage(data []byte) *DubboPackage {
+ return &DubboPackage{
+ Header: DubboHeader{},
+ Service: Service{},
+ Body: make([]interface{}, 7),
+ Err: nil,
+ codec: NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))),
+ }
+
+}
+
+// client side request package, just for serialization
+func NewClientRequestPackage(header DubboHeader, svc Service) *DubboPackage {
+ return &DubboPackage{
+ Header: header,
+ Service: svc,
+ Body: nil,
+ Err: nil,
+ codec: NewDubboCodec(nil),
+ }
+}
+
+// server side response package, just for serialization
+func NewServerResponsePackage(header DubboHeader) *DubboPackage {
+ return &DubboPackage{
+ Header: header,
+ Body: nil,
+ Err: nil,
+ codec: NewDubboCodec(nil),
+ }
+}
+
+func NewDubboPackage(data *bytes.Buffer) *DubboPackage {
+ var codec *DubboCodec
+ if data == nil {
+ codec = NewDubboCodec(nil)
+ } else {
+ codec = NewDubboCodec(bufio.NewReaderSize(data, len(data.Bytes())))
+ }
+ return &DubboPackage{
+ Header: DubboHeader{},
+ Service: Service{},
+ Body: nil,
+ Err: nil,
+ codec: codec,
+ }
+}
diff --git a/protocol/dubbo/proto.go b/protocol/dubbo/proto.go
new file mode 100644
index 0000000..17e9aeb
--- /dev/null
+++ b/protocol/dubbo/proto.go
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+ "encoding/binary"
+)
+
+import (
+ "github.com/pkg/errors"
+ "github.com/golang/protobuf/proto"
+ "github.com/matttproud/golang_protobuf_extensions/pbutil"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/proto"
+)
+
+type ProtoSerializer struct{}
+
+func (p ProtoSerializer) Marshal(pkg DubboPackage) ([]byte, error) {
+ if pkg.IsHeartBeat() {
+ return []byte{byte('N')}, nil
+ }
+ if pkg.Body == nil {
+ return nil, errors.New("package body should not be nil")
+ }
+ if pkg.IsRequest() {
+ return marshalRequestProto(pkg)
+ }
+ return marshalResponseProto(pkg)
+}
+
+func (p ProtoSerializer) Unmarshal(data []byte, pkg *DubboPackage) error {
+ if pkg.IsRequest() {
+ return unmarshalRequestProto(data, pkg)
+ }
+ return unmarshalResponseProto(data, pkg)
+}
+
+func unmarshalResponseProto(data []byte, pkg *DubboPackage) error {
+ if pkg.Body == nil {
+ pkg.SetBody(NewResponsePayload(nil, nil, nil))
+ }
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(data)
+
+ var responseType int32
+ if err := readByte(buf, &responseType); err != nil {
+ return err
+ }
+
+ hasAttachments := false
+ hasException := false
+ switch responseType {
+ case RESPONSE_VALUE_WITH_ATTACHMENTS:
+ hasAttachments = true
+ case RESPONSE_WITH_EXCEPTION:
+ hasException = true
+ case RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ hasAttachments = true
+ hasException = true
+ }
+ if hasException {
+ throwable := pb.ThrowableProto{}
+ if err := readObject(buf, &throwable); err != nil {
+ return err
+ }
+ // generate error only with error message
+ response.Exception = errors.New(throwable.OriginalMessage)
+ } else {
+ // read response body
+ protoMsg, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return errors.New("response rspobj not protobuf message")
+ }
+ if err := readObject(buf, protoMsg); err != nil {
+ return err
+ }
+ }
+
+ if hasAttachments {
+ atta := pb.Map{}
+ if err := readObject(buf, &atta); err != nil {
+ return err
+ }
+ if response.Attachments == nil {
+ response.Attachments = atta.Attachments
+ } else {
+ for k, v := range atta.Attachments {
+ response.Attachments[k] = v
+ }
+ }
+
+ }
+ return nil
+}
+
+func unmarshalRequestProto(data []byte, pkg *DubboPackage) error {
+ var dubboVersion string
+ var svcPath string
+ var svcVersion string
+ var svcMethod string
+ buf := bytes.NewBuffer(data)
+ if err := readUTF(buf, &dubboVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcPath); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcMethod); err != nil {
+ return err
+ }
+ // NOTE: protobuf rpc just have exact one parameter, while golang doesn't need this field
+ var argsType string
+ if err := readUTF(buf, &argsType); err != nil {
+ return err
+ }
+ // get raw body bytes for proxy methods to unmarshal
+ var protoMsgLength int
+ if err := readDelimitedLength(buf, &protoMsgLength); err != nil {
+ return err
+ }
+ argBytes := make([]byte, protoMsgLength)
+ if n, err := buf.Read(argBytes); err != nil {
+ if n != protoMsgLength {
+ return errors.New("illegal msg length")
+ }
+ return err
+ }
+ // unmarshal attachments
+ m := &pb.Map{}
+ if err := readObject(buf, m); err != nil {
+ return err
+ }
+ svc := Service{}
+ svc.Version = svcVersion
+ svc.Method = svcMethod
+ // just as hessian
+ svc.Path = svcPath
+ if svc.Path == "" && len(m.Attachments[constant.PATH_KEY]) > 0 {
+ svc.Path = m.Attachments[constant.PATH_KEY]
+ }
+
+ if _, ok := m.Attachments[constant.INTERFACE_KEY]; ok {
+ svc.Interface = m.Attachments[constant.INTERFACE_KEY]
+ } else {
+ svc.Interface = svc.Path
+ }
+ pkg.SetService(svc)
+ pkg.SetBody(map[string]interface{}{
+ "dubboVersion": dubboVersion,
+ "args": []interface{}{argBytes},
+ "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
+ "attachments": m.Attachments,
+ })
+
+ return nil
+}
+
+func marshalRequestProto(pkg DubboPackage) ([]byte, error) {
+ request := EnsureRequestPayload(pkg.Body)
+ args, ok := request.Params.([]interface{})
+ buf := bytes.NewBuffer(make([]byte, 0))
+ if !ok {
+ return nil, errors.New("proto buffer args should be marshaled in []byte")
+ }
+ // NOTE: protobuf rpc just has exact one parameter
+ if len(args) != 1 {
+ return nil, errors.New("illegal protobuf service, len(arg) should equal 1")
+ }
+ // dubbo version
+ if err := writeUTF(buf, DUBBO_VERSION); err != nil {
+ return nil, err
+ }
+ // service path
+ if err := writeUTF(buf, pkg.Service.Path); err != nil {
+ return nil, err
+ }
+ // service version
+ if err := writeUTF(buf, pkg.Service.Version); err != nil {
+ return nil, err
+ }
+ // service method
+ if err := writeUTF(buf, pkg.Service.Method); err != nil {
+ return nil, err
+ }
+ // parameter types desc
+ v := reflect.ValueOf(args[0])
+ mv := v.MethodByName("JavaClassName")
+ if mv.IsValid() {
+ javaCls := mv.Call([]reflect.Value{})
+ if len(javaCls) != 1 {
+ return nil, errors.New("JavaStringName method should return exact 1 result")
+ }
+ javaClsStr, ok := javaCls[0].Interface().(string)
+ if !ok {
+ return nil, errors.New("JavaClassName method should return string")
+ }
+ if err := writeUTF(buf, getJavaArgType(javaClsStr)); err != nil {
+ return nil, err
+ }
+ } else {
+ // defensive code
+ if err := writeUTF(buf, ""); err != nil {
+ return nil, err
+ }
+ }
+ // consumer args
+ protoMsg := args[0].(proto.Message)
+ if err := writeObject(buf, protoMsg); err != nil {
+ return nil, err
+ }
+ // attachments
+ atta := make(map[string]string)
+ atta[PATH_KEY] = pkg.Service.Path
+ atta[VERSION_KEY] = pkg.Service.Version
+ if len(pkg.Service.Group) > 0 {
+ atta[GROUP_KEY] = pkg.Service.Group
+ }
+ if len(pkg.Service.Interface) > 0 {
+ atta[INTERFACE_KEY] = pkg.Service.Interface
+ }
+ if pkg.Service.Timeout != 0 {
+ atta[TIMEOUT_KEY] = strconv.Itoa(int(pkg.Service.Timeout / time.Millisecond))
+ }
+ m := pb.Map{Attachments: atta}
+ if err := writeObject(buf, &m); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func marshalResponseProto(pkg DubboPackage) ([]byte, error) {
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(make([]byte, 0))
+ responseType := RESPONSE_VALUE
+ hasAttachments := false
+ if response.Attachments != nil {
+ responseType = RESPONSE_VALUE_WITH_ATTACHMENTS
+ hasAttachments = true
+ } else {
+ responseType = RESPONSE_VALUE
+ }
+ if response.Exception != nil {
+ if hasAttachments {
+ responseType = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ } else {
+ responseType = RESPONSE_WITH_EXCEPTION
+ }
+ }
+ // write response type
+ if err := writeByte(buf, responseType); err != nil {
+ return nil, err
+ }
+ if response.Exception != nil {
+ // deal with exception
+ throwable := pb.ThrowableProto{OriginalMessage: response.Exception.Error()}
+ if err := writeObject(buf, &throwable); err != nil {
+ return nil, err
+ }
+ } else {
+ res, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return nil, errors.New("proto buffer params should be marshaled in proto.Message")
+ }
+ // response body
+ if err := writeObject(buf, res); err != nil {
+ return nil, err
+ }
+ }
+
+ if hasAttachments {
+ attachments := pb.Map{Attachments: response.Attachments}
+ if err := writeObject(buf, &attachments); err != nil {
+ return nil, err
+ }
+ }
+ return buf.Bytes(), nil
+}
+
+func init() {
+ extension.SetSerializer("protobuf", ProtoSerializer{})
+}
+
+func getJavaArgType(javaClsName string) string {
+ return fmt.Sprintf("L%s;", strings.ReplaceAll(javaClsName, ".", "/"))
+}
+
+func writeUTF(writer io.Writer, value string) error {
+ _, err := pbutil.WriteDelimited(writer, &pb.StringValue{Value: value})
+ return err
+}
+
+func writeObject(writer io.Writer, value proto.Message) error {
+ _, err := pbutil.WriteDelimited(writer, value)
+ return err
+}
+
+func writeByte(writer io.Writer, v int32) error {
+ i32v := &pb.Int32Value{Value: v}
+ _, err := pbutil.WriteDelimited(writer, i32v)
+ return err
+}
+
+func readUTF(reader io.Reader, value *string) error {
+ sv := &pb.StringValue{}
+ _, err := pbutil.ReadDelimited(reader, sv)
+ if err != nil {
+ return err
+ }
+ *value = sv.Value
+ return nil
+}
+
+func readObject(reader io.Reader, value proto.Message) error {
+ _, err := pbutil.ReadDelimited(reader, value)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// just as java protobuf serialize
+func readByte(reader io.Reader, value *int32) error {
+ i32v := &pb.Int32Value{}
+ _, err := pbutil.ReadDelimited(reader, i32v)
+ if err != nil {
+ return err
+ }
+ *value = i32v.Value
+ return nil
+}
+
+//
+func readDelimitedLength(reader io.Reader, length *int) error {
+ var headerBuf [binary.MaxVarintLen32]byte
+ var bytesRead, varIntBytes int
+ var messageLength uint64
+ for varIntBytes == 0 { // i.e. no varint has been decoded yet.
+ if bytesRead >= len(headerBuf) {
+ return errors.New("invalid varint32 encountered")
+ }
+ // We have to read byte by byte here to avoid reading more bytes
+ // than required. Each read byte is appended to what we have
+ // read before.
+ newBytesRead, err := reader.Read(headerBuf[bytesRead : bytesRead+1])
+ if newBytesRead == 0 {
+ if err != nil {
+ return err
+ }
+ // A Reader should not return (0, nil), but if it does,
+ // it should be treated as no-op (according to the
+ // Reader contract). So let's go on...
+ continue
+ }
+ bytesRead += newBytesRead
+ // Now present everything read so far to the varint decoder and
+ // see if a varint can be decoded already.
+ messageLength, varIntBytes = proto.DecodeVarint(headerBuf[:bytesRead])
+ }
+ *length = int(messageLength)
+ return nil
+}
diff --git a/protocol/dubbo/proto/payload.pb.go b/protocol/dubbo/proto/payload.pb.go
new file mode 100644
index 0000000..eeca59a
--- /dev/null
+++ b/protocol/dubbo/proto/payload.pb.go
@@ -0,0 +1,328 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: proto/payload.proto
+
+package payload
+
+import (
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
+ math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+// equivalent java StringValue
+type StringValue struct {
+ Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StringValue) Reset() { *m = StringValue{} }
+func (m *StringValue) String() string { return proto.CompactTextString(m) }
+func (*StringValue) ProtoMessage() {}
+func (*StringValue) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{0}
+}
+
+func (m *StringValue) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StringValue.Unmarshal(m, b)
+}
+func (m *StringValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StringValue.Marshal(b, m, deterministic)
+}
+func (m *StringValue) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StringValue.Merge(m, src)
+}
+func (m *StringValue) XXX_Size() int {
+ return xxx_messageInfo_StringValue.Size(m)
+}
+func (m *StringValue) XXX_DiscardUnknown() {
+ xxx_messageInfo_StringValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StringValue proto.InternalMessageInfo
+
+func (m *StringValue) GetValue() string {
+ if m != nil {
+ return m.Value
+ }
+ return ""
+}
+
+// equivalent java Int32Value
+type Int32Value struct {
+ Value int32 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Int32Value) Reset() { *m = Int32Value{} }
+func (m *Int32Value) String() string { return proto.CompactTextString(m) }
+func (*Int32Value) ProtoMessage() {}
+func (*Int32Value) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{1}
+}
+
+func (m *Int32Value) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Int32Value.Unmarshal(m, b)
+}
+func (m *Int32Value) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Int32Value.Marshal(b, m, deterministic)
+}
+func (m *Int32Value) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Int32Value.Merge(m, src)
+}
+func (m *Int32Value) XXX_Size() int {
+ return xxx_messageInfo_Int32Value.Size(m)
+}
+func (m *Int32Value) XXX_DiscardUnknown() {
+ xxx_messageInfo_Int32Value.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Int32Value proto.InternalMessageInfo
+
+func (m *Int32Value) GetValue() int32 {
+ if m != nil {
+ return m.Value
+ }
+ return 0
+}
+
+// equivalent java MapValue
+type Map struct {
+ Attachments map[string]string `protobuf:"bytes,1,rep,name=attachments,proto3" json:"attachments,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Map) Reset() { *m = Map{} }
+func (m *Map) String() string { return proto.CompactTextString(m) }
+func (*Map) ProtoMessage() {}
+func (*Map) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{2}
+}
+
+func (m *Map) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Map.Unmarshal(m, b)
+}
+func (m *Map) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Map.Marshal(b, m, deterministic)
+}
+func (m *Map) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Map.Merge(m, src)
+}
+func (m *Map) XXX_Size() int {
+ return xxx_messageInfo_Map.Size(m)
+}
+func (m *Map) XXX_DiscardUnknown() {
+ xxx_messageInfo_Map.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Map proto.InternalMessageInfo
+
+func (m *Map) GetAttachments() map[string]string {
+ if m != nil {
+ return m.Attachments
+ }
+ return nil
+}
+
+// copied from dubbo GenericProtobufObjectOutput.java
+// Messages used for transporting debug information between server and client.
+// An element in a stack trace, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html
+type StackTraceElementProto struct {
+ // The fully qualified name of the class containing the execution point
+ // represented by the stack trace element.
+ ClassName string `protobuf:"bytes,1,opt,name=class_name,json=className,proto3" json:"class_name,omitempty"`
+ // The name of the method containing the execution point represented by the
+ // stack trace element
+ MethodName string `protobuf:"bytes,2,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"`
+ // The name of the file containing the execution point represented by the
+ // stack trace element, or null if this information is unavailable.
+ FileName string `protobuf:"bytes,3,opt,name=file_name,json=fileName,proto3" json:"file_name,omitempty"`
+ // The line number of the source line containing the execution point represented
+ // by this stack trace element, or a negative number if this information is
+ // unavailable.
+ LineNumber int32 `protobuf:"varint,4,opt,name=line_number,json=lineNumber,proto3" json:"line_number,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StackTraceElementProto) Reset() { *m = StackTraceElementProto{} }
+func (m *StackTraceElementProto) String() string { return proto.CompactTextString(m) }
+func (*StackTraceElementProto) ProtoMessage() {}
+func (*StackTraceElementProto) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{3}
+}
+
+func (m *StackTraceElementProto) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StackTraceElementProto.Unmarshal(m, b)
+}
+func (m *StackTraceElementProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StackTraceElementProto.Marshal(b, m, deterministic)
+}
+func (m *StackTraceElementProto) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StackTraceElementProto.Merge(m, src)
+}
+func (m *StackTraceElementProto) XXX_Size() int {
+ return xxx_messageInfo_StackTraceElementProto.Size(m)
+}
+func (m *StackTraceElementProto) XXX_DiscardUnknown() {
+ xxx_messageInfo_StackTraceElementProto.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StackTraceElementProto proto.InternalMessageInfo
+
+func (m *StackTraceElementProto) GetClassName() string {
+ if m != nil {
+ return m.ClassName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetMethodName() string {
+ if m != nil {
+ return m.MethodName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetFileName() string {
+ if m != nil {
+ return m.FileName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetLineNumber() int32 {
+ if m != nil {
+ return m.LineNumber
+ }
+ return 0
+}
+
+// An exception that was thrown by some code, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html
+type ThrowableProto struct {
+ // The name of the class of the exception that was actually thrown. Downstream readers
+ // of this message may or may not have the actual class available to initialize, so
+ // this is just used to prefix the message of a generic exception type.
+ OriginalClassName string `protobuf:"bytes,1,opt,name=original_class_name,json=originalClassName,proto3" json:"original_class_name,omitempty"`
+ // The message of this throwable. Not filled if there is no message.
+ OriginalMessage string `protobuf:"bytes,2,opt,name=original_message,json=originalMessage,proto3" json:"original_message,omitempty"`
+ // The stack trace of this Throwable.
+ StackTrace []*StackTraceElementProto `protobuf:"bytes,3,rep,name=stack_trace,json=stackTrace,proto3" json:"stack_trace,omitempty"`
+ // The cause of this Throwable. Not filled if there is no cause.
+ Cause *ThrowableProto `protobuf:"bytes,4,opt,name=cause,proto3" json:"cause,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ThrowableProto) Reset() { *m = ThrowableProto{} }
+func (m *ThrowableProto) String() string { return proto.CompactTextString(m) }
+func (*ThrowableProto) ProtoMessage() {}
+func (*ThrowableProto) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{4}
+}
+
+func (m *ThrowableProto) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ThrowableProto.Unmarshal(m, b)
+}
+func (m *ThrowableProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ThrowableProto.Marshal(b, m, deterministic)
+}
+func (m *ThrowableProto) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ThrowableProto.Merge(m, src)
+}
+func (m *ThrowableProto) XXX_Size() int {
+ return xxx_messageInfo_ThrowableProto.Size(m)
+}
+func (m *ThrowableProto) XXX_DiscardUnknown() {
+ xxx_messageInfo_ThrowableProto.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ThrowableProto proto.InternalMessageInfo
+
+func (m *ThrowableProto) GetOriginalClassName() string {
+ if m != nil {
+ return m.OriginalClassName
+ }
+ return ""
+}
+
+func (m *ThrowableProto) GetOriginalMessage() string {
+ if m != nil {
+ return m.OriginalMessage
+ }
+ return ""
+}
+
+func (m *ThrowableProto) GetStackTrace() []*StackTraceElementProto {
+ if m != nil {
+ return m.StackTrace
+ }
+ return nil
+}
+
+func (m *ThrowableProto) GetCause() *ThrowableProto {
+ if m != nil {
+ return m.Cause
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*StringValue)(nil), "StringValue")
+ proto.RegisterType((*Int32Value)(nil), "Int32Value")
+ proto.RegisterType((*Map)(nil), "Map")
+ proto.RegisterMapType((map[string]string)(nil), "Map.AttachmentsEntry")
+ proto.RegisterType((*StackTraceElementProto)(nil), "StackTraceElementProto")
+ proto.RegisterType((*ThrowableProto)(nil), "ThrowableProto")
+}
+
+func init() { proto.RegisterFile("proto/payload.proto", fileDescriptor_434bbf44284586dc) }
+
+var fileDescriptor_434bbf44284586dc = []byte{
+ // 353 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x4f, 0x4f, 0xea, 0x40,
+ 0x14, 0xc5, 0x53, 0xfa, 0x78, 0x79, 0xdc, 0x26, 0x0f, 0x1c, 0xfc, 0xd3, 0x68, 0x8c, 0xa4, 0xc6,
+ 0x04, 0x37, 0x35, 0x81, 0x85, 0xc4, 0x85, 0x89, 0x31, 0x2c, 0x5c, 0x40, 0x4c, 0x21, 0x6e, 0x9b,
+ 0x4b, 0x19, 0xa1, 0x61, 0x3a, 0x6d, 0x66, 0x06, 0x0d, 0x1b, 0x3f, 0x86, 0x9f, 0xca, 0x0f, 0x65,
+ 0x66, 0xc6, 0x02, 0x2a, 0xbb, 0x99, 0xdf, 0x39, 0xbd, 0x3d, 0xf7, 0x64, 0xa0, 0x59, 0x88, 0x5c,
+ 0xe5, 0x57, 0x05, 0xae, 0x58, 0x8e, 0xd3, 0xd0, 0xdc, 0x82, 0x73, 0xf0, 0x46, 0x4a, 0xa4, 0x7c,
+ 0xf6, 0x84, 0x6c, 0x49, 0xc9, 0x3e, 0x54, 0x5f, 0xf4, 0xc1, 0x77, 0x5a, 0x4e, 0xbb, 0x16, 0xd9,
+ 0x4b, 0x10, 0x00, 0x3c, 0x70, 0xd5, 0xed, 0xec, 0xf0, 0x54, 0x4b, 0xcf, 0x1b, 0xb8, 0x03, 0x2c,
+ 0xc8, 0x35, 0x78, 0xa8, 0x14, 0x26, 0xf3, 0x8c, 0x72, 0x25, 0x7d, 0xa7, 0xe5, 0xb6, 0xbd, 0xce,
+ 0x41, 0x38, 0xc0, 0x22, 0xbc, 0xdb, 0xf0, 0x3e, 0x57, 0x62, 0x15, 0x6d, 0x3b, 0x8f, 0x6f, 0xa1,
+ 0xf1, 0xd3, 0x40, 0x1a, 0xe0, 0x2e, 0xe8, 0xea, 0x2b, 0x8b, 0x3e, 0x6e, 0xfe, 0x5d, 0xd9, 0xca,
+ 0x77, 0x53, 0xe9, 0x39, 0xc1, 0xbb, 0x03, 0x87, 0x23, 0x85, 0xc9, 0x62, 0x2c, 0x30, 0xa1, 0x7d,
+ 0x46, 0xf5, 0x9c, 0x47, 0xbd, 0x23, 0x39, 0x05, 0x48, 0x18, 0x4a, 0x19, 0x73, 0xcc, 0xca, 0xcd,
+ 0x6a, 0x86, 0x0c, 0x31, 0xa3, 0xe4, 0x0c, 0xbc, 0x8c, 0xaa, 0x79, 0x3e, 0xb5, 0xba, 0x9d, 0x0c,
+ 0x16, 0x19, 0xc3, 0x09, 0xd4, 0x9e, 0x53, 0x46, 0xad, 0xec, 0x1a, 0xf9, 0x9f, 0x06, 0xe5, 0xd7,
+ 0x2c, 0xe5, 0x34, 0xe6, 0xcb, 0x6c, 0x42, 0x85, 0xff, 0xc7, 0x74, 0x02, 0x1a, 0x0d, 0x0d, 0x09,
+ 0x3e, 0x1c, 0xf8, 0x3f, 0x9e, 0x8b, 0xfc, 0x15, 0x27, 0x8c, 0xda, 0x40, 0x21, 0x34, 0x73, 0x91,
+ 0xce, 0x52, 0x8e, 0x2c, 0xfe, 0x95, 0x6c, 0xaf, 0x94, 0xee, 0xd7, 0x09, 0x2f, 0xa1, 0xb1, 0xf6,
+ 0x67, 0x54, 0x4a, 0x9c, 0x95, 0x31, 0xeb, 0x25, 0x1f, 0x58, 0x4c, 0x7a, 0xe0, 0x49, 0xdd, 0x42,
+ 0xac, 0x74, 0x0d, 0xbe, 0x6b, 0xfa, 0x3f, 0x0a, 0x77, 0x37, 0x13, 0x81, 0x5c, 0x73, 0x72, 0x01,
+ 0xd5, 0x04, 0x97, 0x92, 0x9a, 0x15, 0xbc, 0x4e, 0x3d, 0xfc, 0x1e, 0x3a, 0xb2, 0xea, 0xe4, 0xaf,
+ 0x79, 0x37, 0xdd, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x04, 0x4d, 0x68, 0x3a, 0x4e, 0x02, 0x00,
+ 0x00,
+}
diff --git a/protocol/dubbo/proto/payload.proto b/protocol/dubbo/proto/payload.proto
new file mode 100644
index 0000000..19f644e
--- /dev/null
+++ b/protocol/dubbo/proto/payload.proto
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+// equivalent java StringValue
+message StringValue {
+ string value = 1;
+}
+
+// equivalent java Int32Value
+message Int32Value {
+ int32 value = 1;
+}
+
+// equivalent java MapValue
+message Map {
+ map<string, string> attachments = 1;
+}
+
+// copied from dubbo GenericProtobufObjectOutput.java
+// Messages used for transporting debug information between server and client.
+// An element in a stack trace, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html
+message StackTraceElementProto {
+ // The fully qualified name of the class containing the execution point
+ // represented by the stack trace element.
+ string class_name = 1;
+
+ // The name of the method containing the execution point represented by the
+ // stack trace element
+ string method_name = 2;
+
+ // The name of the file containing the execution point represented by the
+ // stack trace element, or null if this information is unavailable.
+ string file_name = 3;
+
+ // The line number of the source line containing the execution point represented
+ // by this stack trace element, or a negative number if this information is
+ // unavailable.
+ int32 line_number = 4;
+}
+
+// An exception that was thrown by some code, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html
+message ThrowableProto {
+ // The name of the class of the exception that was actually thrown. Downstream readers
+ // of this message may or may not have the actual class available to initialize, so
+ // this is just used to prefix the message of a generic exception type.
+ string original_class_name = 1;
+
+ // The message of this throwable. Not filled if there is no message.
+ string original_message = 2;
+
+ // The stack trace of this Throwable.
+ repeated StackTraceElementProto stack_trace = 3;
+
+ // The cause of this Throwable. Not filled if there is no cause.
+ ThrowableProto cause = 4;
+}
+
+
diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go
index b5c4f50..9841b81 100644
--- a/protocol/dubbo/readwriter.go
+++ b/protocol/dubbo/readwriter.go
@@ -18,7 +18,6 @@
package dubbo
import (
- "bytes"
"reflect"
)
@@ -29,8 +28,8 @@ import (
)
import (
- "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
)
@@ -49,42 +48,58 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
- pkg := &DubboPackage{}
-
- buf := bytes.NewBuffer(data)
- err := pkg.Unmarshal(buf, p.client)
- if err != nil {
+ pkg := NewClientResponsePackage(data)
+ if err := pkg.ReadHeader(); err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return nil, 0, nil
}
-
- logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
+ logger.Errorf("[RpcClientPackageHandler.Read] ss:%+v, len(@data):%d) = error:%+v ", ss, len(data), err)
return nil, 0, perrors.WithStack(err)
}
+ if pkg.IsHeartBeat() {
+ // heartbeat package doesn't need deserialize
+ return pkg, pkg.GetLen(), nil
+ }
+<<<<<<< HEAD
if pkg.Header.Type&hessian.PackageRequest == 0x00 {
pkg.Err = pkg.Body.(*hessian.Response).Exception
pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
+=======
+ if err := loadSerializer(pkg); err != nil {
+ return nil, 0, err
+>>>>>>> feature: support protobuf
}
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ // load response
+ pendingRsp, ok := p.client.pendingResponses.Load(SequenceType(pkg.GetHeader().ID))
+ if !ok {
+ return nil, 0, perrors.Errorf("client.GetPendingResopnse(%v) = nil", pkg.GetHeader().ID)
+ }
+ // set package body
+ body := NewResponsePayload(pendingRsp.(*PendingResponse).response.reply, nil, nil)
+ pkg.SetBody(body)
+ err := pkg.Unmarshal()
+ if err != nil {
+ return nil, 0, perrors.WithStack(err)
+ }
+ resp := pkg.Body.(*ResponsePayload)
+ pkg.Err = resp.Exception
+ pkg.Body = NewResponse(resp.RspObj, resp.Attachments)
+ return pkg, pkg.GetLen(), nil
}
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
req, ok := pkg.(*DubboPackage)
if !ok {
- logger.Errorf("illegal pkg:%+v\n", pkg)
return nil, perrors.New("invalid rpc request")
}
-
buf, err := req.Marshal()
if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
-
return buf.Bytes(), nil
}
@@ -96,16 +111,33 @@ var (
rpcServerPkgHandler = &RpcServerPackageHandler{}
)
+<<<<<<< HEAD
// RpcServerPackageHandler ...
type RpcServerPackageHandler struct{}
+=======
+type RpcServerPackageHandler struct {
+}
+>>>>>>> feature: support protobuf
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
- pkg := &DubboPackage{
- Body: make([]interface{}, 7),
+ pkg := NewServerRequestPackage(data)
+ if err := pkg.ReadHeader(); err != nil {
+ originErr := perrors.Cause(err)
+ if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+ return nil, 0, nil
+ }
+ return nil, 0, perrors.WithStack(err)
+ }
+
+ if pkg.IsHeartBeat() {
+ return pkg, pkg.GetLen(), nil
+ }
+
+ if err := loadSerializer(pkg); err != nil {
+ return nil, 0, err
}
- buf := bytes.NewBuffer(data)
- err := pkg.Unmarshal(buf)
+ err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
@@ -113,60 +145,9 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
}
logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
return nil, 0, perrors.WithStack(err)
}
-
- if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
- // convert params of request
- req := pkg.Body.([]interface{}) // length of body should be 7
- if len(req) > 0 {
- var dubboVersion, argsTypes string
- var args []interface{}
- var attachments map[string]string
- if req[0] != nil {
- dubboVersion = req[0].(string)
- }
- if req[1] != nil {
- pkg.Service.Path = req[1].(string)
- }
- if req[2] != nil {
- pkg.Service.Version = req[2].(string)
- }
- if req[3] != nil {
- pkg.Service.Method = req[3].(string)
- }
- if req[4] != nil {
- argsTypes = req[4].(string)
- }
- if req[5] != nil {
- args = req[5].([]interface{})
- }
- if req[6] != nil {
- attachments = req[6].(map[string]string)
- }
- if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
- pkg.Service.Path = attachments[constant.PATH_KEY]
- }
- if _, ok := attachments[constant.INTERFACE_KEY]; ok {
- pkg.Service.Interface = attachments[constant.INTERFACE_KEY]
- } else {
- pkg.Service.Interface = pkg.Service.Path
- }
- if len(attachments[constant.GROUP_KEY]) > 0 {
- pkg.Service.Group = attachments[constant.GROUP_KEY]
- }
- pkg.Body = map[string]interface{}{
- "dubboVersion": dubboVersion,
- "argsTypes": argsTypes,
- "args": args,
- "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Path), // path as a key
- "attachments": attachments,
- }
- }
- }
-
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ return pkg, pkg.GetLen(), nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
@@ -175,12 +156,24 @@ func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by
logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return nil, perrors.New("invalid rpc response")
}
-
buf, err := res.Marshal()
if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
-
return buf.Bytes(), nil
}
+
+func loadSerializer(p *DubboPackage) error {
+ // NOTE: default serialID is S_Hessian
+ serialID := p.Header.SerialID
+ if serialID == 0 {
+ serialID = constant.S_Hessian2
+ }
+ serializer, err := extension.GetSerializerById(serialID)
+ if err != nil {
+ return err
+ }
+ p.SetSerializer(serializer.(Serializer))
+ return nil
+}
diff --git a/protocol/dubbo/request.go b/protocol/dubbo/request.go
new file mode 100644
index 0000000..54568df
--- /dev/null
+++ b/protocol/dubbo/request.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+type RequestPayload struct {
+ Params interface{}
+ Attachments map[string]string
+}
+
+func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload {
+ if atta == nil {
+ atta = make(map[string]string)
+ }
+ return &RequestPayload{
+ Params: args,
+ Attachments: atta,
+ }
+}
+
+func EnsureRequestPayload(body interface{}) *RequestPayload {
+ if req, ok := body.(*RequestPayload); ok {
+ return req
+ }
+ return NewRequestPayload(body, nil)
+}
diff --git a/protocol/dubbo/response.go b/protocol/dubbo/response.go
new file mode 100644
index 0000000..95dcc98
--- /dev/null
+++ b/protocol/dubbo/response.go
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+type ResponsePayload struct {
+ RspObj interface{}
+ Exception error
+ Attachments map[string]string
+}
+
+// NewResponse create a new ResponsePayload
+func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload {
+ if attachments == nil {
+ attachments = make(map[string]string)
+ }
+ return &ResponsePayload{
+ RspObj: rspObj,
+ Exception: exception,
+ Attachments: attachments,
+ }
+}
+
+func EnsureResponsePayload(body interface{}) *ResponsePayload {
+ if res, ok := body.(*ResponsePayload); ok {
+ return res
+ }
+ if exp, ok := body.(error); ok {
+ return NewResponsePayload(nil, exp, nil)
+ }
+ return NewResponsePayload(body, nil, nil)
+}
diff --git a/protocol/dubbo/serialize.go b/protocol/dubbo/serialize.go
new file mode 100644
index 0000000..4da3947
--- /dev/null
+++ b/protocol/dubbo/serialize.go
@@ -0,0 +1,6 @@
+package dubbo
+
+type Serializer interface {
+ Marshal(p DubboPackage) ([]byte, error)
+ Unmarshal([]byte, *DubboPackage) error
+}
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index e13443d..c5252a9 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -218,7 +218,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
//Interested register to dataconfig.
r.dataListener.AddInterestedURL(conf)
for _, v := range strings.Split(conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
- go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service())), r.dataListener)
+ u := common.URL{Path: fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service()))}
+ go r.listener.ListenServiceEvent(u.Path, r.dataListener)
}
return zkListener, nil