You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/07/26 08:28:51 UTC
[dubbo-go] 01/04: Fix: resolve conflicts
This is an automated email from the ASF dual-hosted git repository.
fangyc pushed a commit to branch refact-seri
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 6b93bbc4f7ecf046098ac3beaaf10e4be5f40980
Author: fangyincheng <fa...@sina.com>
AuthorDate: Sat Jun 27 18:14:50 2020 +0800
Fix: resolve conflicts
---
common/constant/default.go | 1 +
common/constant/key.go | 1 +
common/constant/serializtion.go | 28 ++
common/url.go | 27 ++
config/service_config.go | 4 +-
go.mod | 1 +
protocol/dubbo/client.go | 301 +-----------
protocol/dubbo/client_test.go | 101 ++--
protocol/dubbo/codec.go | 157 -------
protocol/dubbo/codec_test.go | 83 ----
protocol/dubbo/dubbo_invoker.go | 21 +-
protocol/dubbo/dubbo_invoker_test.go | 15 +-
protocol/dubbo/dubbo_protocol.go | 11 +-
protocol/dubbo/dubbo_protocol_test.go | 15 +-
protocol/dubbo/impl/codec.go | 299 ++++++++++++
protocol/dubbo/impl/codec_test.go | 197 ++++++++
protocol/dubbo/impl/const.go | 243 ++++++++++
protocol/dubbo/impl/hessian.go | 508 +++++++++++++++++++++
protocol/dubbo/impl/package.go | 171 +++++++
protocol/dubbo/impl/proto.go | 450 ++++++++++++++++++
protocol/dubbo/impl/proto/payload.pb.go | 345 ++++++++++++++
protocol/dubbo/impl/proto/payload.proto | 78 ++++
.../{client.go => impl/remoting/client_impl.go} | 266 ++++++-----
protocol/dubbo/{ => impl/remoting}/config.go | 48 +-
protocol/dubbo/impl/remoting/errors.go | 17 +
protocol/dubbo/{ => impl/remoting}/pool.go | 27 +-
protocol/dubbo/{ => impl/remoting}/readwriter.go | 163 +++----
.../{server.go => impl/remoting/server_impl.go} | 86 +---
.../remoting/server_listener.go} | 173 +++----
protocol/dubbo/impl/request.go | 40 ++
protocol/dubbo/impl/response.go | 46 ++
protocol/dubbo/impl/serialization.go | 54 +++
protocol/dubbo/impl/serialize.go | 40 ++
protocol/dubbo/server.go | 173 +++----
.../dubbo/{listener_test.go => server_test.go} | 4 +-
test/integrate/dubbo/go-server/server.go | 2 +-
36 files changed, 3056 insertions(+), 1140 deletions(-)
diff --git a/common/constant/default.go b/common/constant/default.go
index c69989b..dd75691 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -44,6 +44,7 @@ const (
DEFAULT_REST_CLIENT = "resty"
DEFAULT_REST_SERVER = "go-restful"
DEFAULT_PORT = 20000
+ DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION
)
const (
diff --git a/common/constant/key.go b/common/constant/key.go
index 5be63fe..6e73183 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -77,6 +77,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
+ SERIALIZATION_KEY = "serialization"
)
const (
diff --git a/common/constant/serializtion.go b/common/constant/serializtion.go
new file mode 100644
index 0000000..f27598c
--- /dev/null
+++ b/common/constant/serializtion.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constant
+
+const (
+ S_Hessian2 byte = 2
+ S_Proto byte = 21
+)
+
+const (
+ HESSIAN2_SERIALIZATION = "hessian2"
+ PROTOBUF_SERIALIZATION = "protobuf"
+)
diff --git a/common/url.go b/common/url.go
index 1cfa47a..51eabff 100644
--- a/common/url.go
+++ b/common/url.go
@@ -23,6 +23,7 @@ import (
"math"
"net"
"net/url"
+ "sort"
"strconv"
"strings"
)
@@ -650,3 +651,29 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
}
return methodConfigMergeFcn
}
+
+// doesn't encode url reserve character, url.QueryEscape will do this work
+// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
+func ParamsUnescapeEncode(params url.Values) string {
+ if params == nil {
+ return ""
+ }
+ var buf strings.Builder
+ keys := make([]string, len(params))
+ for k := range params {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ for _, k := range keys {
+ vs := params[k]
+ for _, v := range vs {
+ if buf.Len() > 0 {
+ buf.WriteByte('&')
+ }
+ buf.WriteString(k)
+ buf.WriteByte('=')
+ buf.WriteString(v)
+ }
+ }
+ return buf.String()
+}
diff --git a/config/service_config.go b/config/service_config.go
index 70a344c..a49af18 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -59,6 +59,7 @@ type ServiceConfig struct {
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
+ Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
@@ -228,7 +229,8 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
-
+ // todo: move
+ urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
diff --git a/go.mod b/go.mod
index 887df01..45d839d 100644
--- a/go.mod
+++ b/go.mod
@@ -38,6 +38,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
+ github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index 6d1b771..0bd7aac 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -19,36 +19,17 @@ package dubbo
import (
"math/rand"
- "strings"
- "sync"
"time"
)
import (
- hessian "github.com/apache/dubbo-go-hessian2"
- "github.com/dubbogo/getty"
- gxsync "github.com/dubbogo/gost/sync"
- perrors "github.com/pkg/errors"
- "go.uber.org/atomic"
"gopkg.in/yaml.v2"
)
import (
- "github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
-)
-
-var (
- errInvalidCodecType = perrors.New("illegal CodecType")
- errInvalidAddress = perrors.New("remote address invalid or empty")
- errSessionNotExist = perrors.New("session not exist")
- errClientClosed = perrors.New("client closed")
- errClientReadTimeout = perrors.New("client read timeout")
-
- clientConf *ClientConfig
- clientGrpool *gxsync.TaskPool
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
)
func init() {
@@ -60,7 +41,7 @@ func init() {
return
}
protocolConf := config.GetConsumerConfig().ProtocolConf
- defaultClientConfig := GetDefaultClientConfig()
+ defaultClientConfig := remoting.GetDefaultClientConfig()
if protocolConf == nil {
logger.Info("protocol_conf default use dubbo config")
} else {
@@ -78,286 +59,12 @@ func init() {
panic(err)
}
}
- clientConf = &defaultClientConfig
+ clientConf := &defaultClientConfig
if err := clientConf.CheckValidity(); err != nil {
logger.Warnf("[CheckValidity] error: %v", err)
return
}
- setClientGrpool()
+ remoting.SetClientConf(*clientConf)
rand.Seed(time.Now().UnixNano())
}
-
-// SetClientConf set dubbo client config.
-func SetClientConf(c ClientConfig) {
- clientConf = &c
- err := clientConf.CheckValidity()
- if err != nil {
- logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
- return
- }
- setClientGrpool()
-}
-
-// GetClientConf get dubbo client config.
-func GetClientConf() ClientConfig {
- return *clientConf
-}
-
-func setClientGrpool() {
- if clientConf.GrPoolSize > 1 {
- clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
- gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
- }
-}
-
-// Options is option for create dubbo client
-type Options struct {
- // connect timeout
- ConnectTimeout time.Duration
- // request timeout
- RequestTimeout time.Duration
-}
-
-//AsyncCallbackResponse async response for dubbo
-type AsyncCallbackResponse struct {
- common.CallbackResponse
- Opts Options
- Cause error
- Start time.Time // invoke(call) start time == write start time
- ReadStart time.Time // read start time, write duration = ReadStart - Start
- Reply interface{}
-}
-
-// Client is dubbo protocol client.
-type Client struct {
- opts Options
- conf ClientConfig
- pool *gettyRPCClientPool
- sequence atomic.Uint64
-
- pendingResponses *sync.Map
-}
-
-// NewClient create a new Client.
-func NewClient(opt Options) *Client {
-
- switch {
- case opt.ConnectTimeout == 0:
- opt.ConnectTimeout = 3 * time.Second
- fallthrough
- case opt.RequestTimeout == 0:
- opt.RequestTimeout = 3 * time.Second
- }
-
- // make sure that client request sequence is an odd number
- initSequence := uint64(rand.Int63n(time.Now().UnixNano()))
- if initSequence%2 == 0 {
- initSequence++
- }
-
- c := &Client{
- opts: opt,
- pendingResponses: new(sync.Map),
- conf: *clientConf,
- }
- c.sequence.Store(initSequence)
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
-
- return c
-}
-
-// Request is dubbo protocol request.
-type Request struct {
- addr string
- svcUrl common.URL
- method string
- args interface{}
- atta map[string]string
-}
-
-// NewRequest create a new Request.
-func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
- return &Request{
- addr: addr,
- svcUrl: svcUrl,
- method: method,
- args: args,
- atta: atta,
- }
-}
-
-// Response is dubbo protocol response.
-type Response struct {
- reply interface{}
- atta map[string]string
-}
-
-// NewResponse create a new Response.
-func NewResponse(reply interface{}, atta map[string]string) *Response {
- return &Response{
- reply: reply,
- atta: atta,
- }
-}
-
-// CallOneway call by one way
-func (c *Client) CallOneway(request *Request) error {
-
- return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
-}
-
-// Call call remoting by two way or one way, if @response.reply is nil, the way of call is one way.
-func (c *Client) Call(request *Request, response *Response) error {
- ct := CT_TwoWay
- if response.reply == nil {
- ct = CT_OneWay
- }
-
- return perrors.WithStack(c.call(ct, request, response, nil))
-}
-
-// AsyncCall call remoting by async with callback.
-func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {
- return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
-}
-
-func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {
- p := &DubboPackage{}
- p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
- p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
- p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
- p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
- p.Service.Method = request.method
-
- p.Service.Timeout = c.opts.RequestTimeout
- var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
- if len(timeout) != 0 {
- if t, err := time.ParseDuration(timeout); err == nil {
- p.Service.Timeout = t
- }
- }
-
- p.Header.SerialID = byte(S_Dubbo)
- p.Body = hessian.NewRequest(request.args, request.atta)
-
- var rsp *PendingResponse
- if ct != CT_OneWay {
- p.Header.Type = hessian.PackageRequest_TwoWay
- rsp = NewPendingResponse()
- rsp.response = response
- rsp.callback = callback
- } else {
- p.Header.Type = hessian.PackageRequest
- }
-
- var (
- err error
- session getty.Session
- conn *gettyRPCClient
- )
- conn, session, err = c.selectSession(request.addr)
- if err != nil {
- return perrors.WithStack(err)
- }
- if session == nil {
- return errSessionNotExist
- }
- defer func() {
- if err == nil {
- c.pool.put(conn)
- return
- }
- conn.close()
- }()
-
- if err = c.transfer(session, p, rsp); err != nil {
- return perrors.WithStack(err)
- }
-
- if ct == CT_OneWay || callback != nil {
- return nil
- }
-
- select {
- case <-getty.GetTimeWheel().After(c.opts.RequestTimeout):
- c.removePendingResponse(SequenceType(rsp.seq))
- return perrors.WithStack(errClientReadTimeout)
- case <-rsp.done:
- err = rsp.err
- }
-
- return perrors.WithStack(err)
-}
-
-// Close close the client pool.
-func (c *Client) Close() {
- if c.pool != nil {
- c.pool.close()
- }
- c.pool = nil
-}
-
-func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
- rpcClient, err := c.pool.getGettyRpcClient(DUBBO, addr)
- if err != nil {
- return nil, nil, perrors.WithStack(err)
- }
- return rpcClient, rpcClient.selectSession(), nil
-}
-
-func (c *Client) heartbeat(session getty.Session) error {
- return c.transfer(session, nil, NewPendingResponse())
-}
-
-func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
- rsp *PendingResponse) error {
-
- var (
- sequence uint64
- err error
- )
-
- sequence = c.sequence.Add(1)
-
- if pkg == nil {
- pkg = &DubboPackage{}
- pkg.Body = hessian.NewRequest([]interface{}{}, nil)
- pkg.Body = []interface{}{}
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
- }
- pkg.Header.ID = int64(sequence)
-
- // cond1
- if rsp != nil {
- rsp.seq = sequence
- c.addPendingResponse(rsp)
- }
-
- err = session.WritePkg(pkg, c.opts.RequestTimeout)
- if err != nil {
- c.removePendingResponse(SequenceType(rsp.seq))
- } else if rsp != nil { // cond2
- // cond2 should not merged with cond1. cause the response package may be returned very
- // soon and it will be handled by other goroutine.
- rsp.readStart = time.Now()
- }
-
- return perrors.WithStack(err)
-}
-
-func (c *Client) addPendingResponse(pr *PendingResponse) {
- c.pendingResponses.Store(SequenceType(pr.seq), pr)
-}
-
-func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
- if c.pendingResponses == nil {
- return nil
- }
- if presp, ok := c.pendingResponses.Load(seq); ok {
- c.pendingResponses.Delete(seq)
- return presp.(*PendingResponse)
- }
- return nil
-}
diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go
index 744ffa8..51e532e 100644
--- a/protocol/dubbo/client_test.go
+++ b/protocol/dubbo/client_test.go
@@ -1,18 +1,18 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
*/
package dubbo
@@ -35,23 +35,24 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
)
func TestClient_CallOneway(t *testing.T) {
proto, url := InitTest(t)
- c := &Client{
- pendingResponses: new(sync.Map),
- conf: *clientConf,
- opts: Options{
+ c := &remoting.Client{
+ PendingResponses: new(sync.Map),
+ Conf: *remoting.GetClientConf(),
+ Opts: remoting.Options{
ConnectTimeout: 3e9,
RequestTimeout: 6e9,
},
}
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+ c.Pool = remoting.NewGettyRPCClientConnPool(c, remoting.GetClientConf().PoolSize, time.Duration(int(time.Second)*remoting.GetClientConf().PoolTTL))
//user := &User{}
- err := c.CallOneway(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil))
+ err := c.CallOneway(remoting.NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil))
assert.NoError(t, err)
// destroy
@@ -61,15 +62,15 @@ func TestClient_CallOneway(t *testing.T) {
func TestClient_Call(t *testing.T) {
proto, url := InitTest(t)
- c := &Client{
- pendingResponses: new(sync.Map),
- conf: *clientConf,
- opts: Options{
+ c := &remoting.Client{
+ PendingResponses: new(sync.Map),
+ Conf: *remoting.GetClientConf(),
+ Opts: remoting.Options{
ConnectTimeout: 3e9,
RequestTimeout: 10e9,
},
}
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+ c.Pool = remoting.NewGettyRPCClientConnPool(c, remoting.GetClientConf().PoolSize, time.Duration(int(time.Second)*remoting.GetClientConf().PoolTTL))
var (
user *User
@@ -77,50 +78,50 @@ func TestClient_Call(t *testing.T) {
)
user = &User{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetBigPkg", []interface{}{nil}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.NotEqual(t, "", user.Id)
assert.NotEqual(t, "", user.Name)
user = &User{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
user = &User{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", nil, "username"}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser1", []interface{}{}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser1", []interface{}{}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser2", []interface{}{}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser2", []interface{}{}, nil), remoting.NewResponse(user, nil))
assert.EqualError(t, err, "error")
user2 := []interface{}{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser3", []interface{}{}, nil), NewResponse(&user2, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser3", []interface{}{}, nil), remoting.NewResponse(&user2, nil))
assert.NoError(t, err)
assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
user2 = []interface{}{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil), NewResponse(&user2, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser4", []interface{}{[]interface{}{"1", "username"}}, nil), remoting.NewResponse(&user2, nil))
assert.NoError(t, err)
assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0])
user3 := map[interface{}]interface{}{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil), NewResponse(&user3, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser5", []interface{}{map[interface{}]interface{}{"id": "1", "name": "username"}}, nil), remoting.NewResponse(&user3, nil))
assert.NoError(t, err)
assert.NotNil(t, user3)
assert.Equal(t, &User{Id: "1", Name: "username"}, user3["key"])
user = &User{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{0}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.Equal(t, User{Id: "", Name: ""}, *user)
user = &User{}
- err = c.Call(NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, nil), NewResponse(user, nil))
+ err = c.Call(remoting.NewRequest("127.0.0.1:20000", url, "GetUser6", []interface{}{1}, nil), remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: ""}, *user)
@@ -131,24 +132,24 @@ func TestClient_Call(t *testing.T) {
func TestClient_AsyncCall(t *testing.T) {
proto, url := InitTest(t)
- c := &Client{
- pendingResponses: new(sync.Map),
- conf: *clientConf,
- opts: Options{
+ c := &remoting.Client{
+ PendingResponses: new(sync.Map),
+ Conf: *remoting.GetClientConf(),
+ Opts: remoting.Options{
ConnectTimeout: 3e9,
RequestTimeout: 6e9,
},
}
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+ c.Pool = remoting.NewGettyRPCClientConnPool(c, remoting.GetClientConf().PoolSize, time.Duration(int(time.Second)*remoting.GetClientConf().PoolTTL))
user := &User{}
lock := sync.Mutex{}
lock.Lock()
- err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) {
- r := response.(AsyncCallbackResponse)
- assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
+ err := c.AsyncCall(remoting.NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) {
+ r := response.(remoting.AsyncCallbackResponse)
+ assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*remoting.Response).Reply.(*User))
lock.Unlock()
- }, NewResponse(user, nil))
+ }, remoting.NewResponse(user, nil))
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
@@ -167,13 +168,13 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
assert.Equal(t, "GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6", methods)
// config
- SetClientConf(ClientConfig{
+ remoting.SetClientConf(remoting.ClientConfig{
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
- GettySessionParam: GettySessionParam{
+ GettySessionParam: remoting.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
@@ -188,11 +189,11 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
SessionName: "client",
},
})
- assert.NoError(t, clientConf.CheckValidity())
- SetServerConfig(ServerConfig{
+ assert.NoError(t, remoting.GetClientConf().CheckValidity())
+ remoting.SetServerConfig(remoting.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
- GettySessionParam: GettySessionParam{
+ GettySessionParam: remoting.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
@@ -206,7 +207,7 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
MaxMsgLen: 10240000000,
SessionName: "server",
}})
- assert.NoError(t, srvConf.CheckValidity())
+ assert.NoError(t, remoting.GetServerConfig().CheckValidity())
// Export
proto := GetProtocol()
diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go
deleted file mode 100644
index 1f7d107..0000000
--- a/protocol/dubbo/codec.go
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dubbo
-
-import (
- "bufio"
- "bytes"
- "fmt"
- "time"
-)
-
-import (
- "github.com/apache/dubbo-go-hessian2"
- "github.com/apache/dubbo-go/common"
- perrors "github.com/pkg/errors"
-)
-
-//SerialID serial ID
-type SerialID byte
-
-const (
- // S_Dubbo dubbo serial id
- S_Dubbo SerialID = 2
-)
-
-//CallType call type
-type CallType int32
-
-const (
- // CT_UNKNOWN unknown call type
- CT_UNKNOWN CallType = 0
- // CT_OneWay call one way
- CT_OneWay CallType = 1
- // CT_TwoWay call in request/response
- CT_TwoWay CallType = 2
-)
-
-////////////////////////////////////////////
-// dubbo package
-////////////////////////////////////////////
-
-// SequenceType ...
-type SequenceType int64
-
-// DubboPackage ...
-type DubboPackage struct {
- Header hessian.DubboHeader
- Service hessian.Service
- Body interface{}
- Err error
-}
-
-// String prints dubbo package detail include header、path、body etc.
-func (p DubboPackage) String() string {
- return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
-}
-
-// Marshal encode hessian package.
-func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
- codec := hessian.NewHessianCodec(nil)
-
- pkg, err := codec.Write(p.Service, p.Header, p.Body)
- if err != nil {
- return nil, perrors.WithStack(err)
- }
-
- return bytes.NewBuffer(pkg), nil
-}
-
-// Unmarshal dncode hessian package.
-func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
- // fix issue https://github.com/apache/dubbo-go/issues/380
- bufLen := buf.Len()
- if bufLen < hessian.HEADER_LENGTH {
- return perrors.WithStack(hessian.ErrHeaderNotEnough)
- }
-
- codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
-
- // read header
- err := codec.ReadHeader(&p.Header)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- if len(opts) != 0 { // for client
- client, ok := opts[0].(*Client)
- if !ok {
- return perrors.Errorf("opts[0] is not of type *Client")
- }
-
- if p.Header.Type&hessian.PackageRequest != 0x00 {
- // size of this array must be '7'
- // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
- p.Body = make([]interface{}, 7)
- } else {
- pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
- if !ok {
- return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
- }
- p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply}
- }
- }
-
- // read body
- err = codec.ReadBody(p.Body)
- return perrors.WithStack(err)
-}
-
-////////////////////////////////////////////
-// PendingResponse
-////////////////////////////////////////////
-
-// PendingResponse is a pending response.
-type PendingResponse struct {
- seq uint64
- err error
- start time.Time
- readStart time.Time
- callback common.AsyncCallback
- response *Response
- done chan struct{}
-}
-
-// NewPendingResponse create a PendingResponses.
-func NewPendingResponse() *PendingResponse {
- return &PendingResponse{
- start: time.Now(),
- response: &Response{},
- done: make(chan struct{}),
- }
-}
-
-// GetCallResponse get AsyncCallbackResponse.
-func (r PendingResponse) GetCallResponse() common.CallbackResponse {
- return AsyncCallbackResponse{
- Cause: r.err,
- Start: r.start,
- ReadStart: r.readStart,
- Reply: r.response,
- }
-}
diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go
deleted file mode 100644
index 5dc71f0..0000000
--- a/protocol/dubbo/codec_test.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dubbo
-
-import (
- "bytes"
- "testing"
- "time"
-)
-
-import (
- hessian "github.com/apache/dubbo-go-hessian2"
- perrors "github.com/pkg/errors"
- "github.com/stretchr/testify/assert"
-)
-
-func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
- pkg := &DubboPackage{}
- pkg.Body = []interface{}{"a"}
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
- pkg.Header.ID = 10086
-
- // heartbeat
- data, err := pkg.Marshal()
- assert.NoError(t, err)
-
- pkgres := &DubboPackage{}
- pkgres.Body = []interface{}{}
- err = pkgres.Unmarshal(data)
- assert.NoError(t, err)
- assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type)
- assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
- assert.Equal(t, int64(10086), pkgres.Header.ID)
- assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
-
- // request
- pkg.Header.Type = hessian.PackageRequest
- pkg.Service.Interface = "Service"
- pkg.Service.Path = "path"
- pkg.Service.Version = "2.6"
- pkg.Service.Method = "Method"
- pkg.Service.Timeout = time.Second
- data, err = pkg.Marshal()
- assert.NoError(t, err)
-
- pkgres = &DubboPackage{}
- pkgres.Body = make([]interface{}, 7)
- err = pkgres.Unmarshal(data)
- assert.NoError(t, err)
- assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type)
- assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
- assert.Equal(t, int64(10086), pkgres.Header.ID)
- assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0])
- assert.Equal(t, "path", pkgres.Body.([]interface{})[1])
- assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2])
- assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
- assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
- assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
- assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6])
-}
-
-func TestIssue380(t *testing.T) {
- pkg := &DubboPackage{}
- buf := bytes.NewBuffer([]byte("hello"))
- err := pkg.Unmarshal(buf)
- assert.True(t, perrors.Cause(err) == hessian.ErrHeaderNotEnough)
-}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 59202d5..1907c38 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)
@@ -52,14 +53,14 @@ var (
// DubboInvoker is dubbo client invoker.
type DubboInvoker struct {
protocol.BaseInvoker
- client *Client
+ client *remoting.Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}
-// NewDubboInvoker create dubbo client invoker.
-func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
+// NewDubboInvoker ...
+func NewDubboInvoker(url common.URL, client *remoting.Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
@@ -94,29 +95,33 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
di.appendCtx(ctx, inv)
url := di.GetUrl()
+ // default hessian2 serialization, compatible
+ if url.GetParam("serialization", "") == "" {
+ url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION)
+ }
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
if err != nil {
logger.Errorf("ParseBool - error: %v", err)
async = false
}
- response := NewResponse(inv.Reply(), nil)
+ response := remoting.NewResponse(inv.Reply(), nil)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
- result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
+ result.Err = di.client.AsyncCall(remoting.NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else {
- result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
+ result.Err = di.client.CallOneway(remoting.NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
}
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
} else {
- result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
+ result.Err = di.client.Call(remoting.NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
}
}
if result.Err == nil {
result.Rest = inv.Reply()
- result.Attrs = response.atta
+ result.Attrs = response.Atta
}
logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go
index 1a64301..e96e859 100644
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@ -32,21 +32,22 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestDubboInvoker_Invoke(t *testing.T) {
proto, url := InitTest(t)
- c := &Client{
- pendingResponses: new(sync.Map),
- conf: *clientConf,
- opts: Options{
+ c := &remoting.Client{
+ PendingResponses: new(sync.Map),
+ Conf: *remoting.GetClientConf(),
+ Opts: remoting.Options{
ConnectTimeout: 3 * time.Second,
RequestTimeout: 6 * time.Second,
},
}
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+ c.Pool = remoting.NewGettyRPCClientConnPool(c, remoting.GetClientConf().PoolSize, time.Duration(int(time.Second)*remoting.GetClientConf().PoolTTL))
invoker := NewDubboInvoker(url, c)
user := &User{}
@@ -69,8 +70,8 @@ func TestDubboInvoker_Invoke(t *testing.T) {
lock := sync.Mutex{}
lock.Lock()
inv.SetCallBack(func(response common.CallbackResponse) {
- r := response.(AsyncCallbackResponse)
- assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
+ r := response.(remoting.AsyncCallbackResponse)
+ assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*remoting.Response).Reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(context.Background(), inv)
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 9eeefd0..cc67569 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -29,9 +29,9 @@ import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
)
-// dubbo protocol constant
const (
// DUBBO is dubbo protocol name
DUBBO = "dubbo"
@@ -48,7 +48,7 @@ var (
// DubboProtocol is a dubbo protocol implement.
type DubboProtocol struct {
protocol.BaseProtocol
- serverMap map[string]*Server
+ serverMap map[string]*remoting.Server
serverLock sync.Mutex
}
@@ -56,7 +56,7 @@ type DubboProtocol struct {
func NewDubboProtocol() *DubboProtocol {
return &DubboProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
- serverMap: make(map[string]*Server),
+ serverMap: make(map[string]*remoting.Server),
}
}
@@ -67,7 +67,6 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
dp.SetExporterMap(serviceKey, exporter)
logger.Infof("Export service: %s", url.String())
-
// start server
dp.openServer(url)
return exporter
@@ -83,7 +82,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
requestTimeout = t
}
- invoker := NewDubboInvoker(url, NewClient(Options{
+ invoker := NewDubboInvoker(url, remoting.NewClient(remoting.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: requestTimeout,
}))
@@ -116,7 +115,7 @@ func (dp *DubboProtocol) openServer(url common.URL) {
dp.serverLock.Lock()
_, ok = dp.serverMap[url.Location]
if !ok {
- srv := NewServer()
+ srv := remoting.NewServer(NewStubHandler())
dp.serverMap[url.Location] = srv
srv.Start(url)
}
diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go
index 14f6868..1915a07 100644
--- a/protocol/dubbo/dubbo_protocol_test.go
+++ b/protocol/dubbo/dubbo_protocol_test.go
@@ -22,19 +22,21 @@ import (
)
import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
)
func TestDubboProtocol_Export(t *testing.T) {
+ srvCfg := remoting.GetDefaultServerConfig()
+ remoting.SetServerConfig(srvCfg)
// Export
proto := GetProtocol()
- srvConf = &ServerConfig{}
url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
@@ -75,6 +77,8 @@ func TestDubboProtocol_Export(t *testing.T) {
}
func TestDubboProtocol_Refer(t *testing.T) {
+ cliCfg := remoting.GetDefaultClientConfig()
+ remoting.SetClientConf(cliCfg)
// Refer
proto := GetProtocol()
url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
@@ -83,7 +87,6 @@ func TestDubboProtocol_Refer(t *testing.T) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000×tamp=1556509797245")
assert.NoError(t, err)
- clientConf = &ClientConfig{}
invoker := proto.Refer(url)
// make sure url
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
new file mode 100644
index 0000000..f527bbc
--- /dev/null
+++ b/protocol/dubbo/impl/codec.go
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "bufio"
+ "encoding/binary"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/logger"
+)
+
+type DubboCodec struct {
+ reader *bufio.Reader
+ pkgType PackageType
+ bodyLen int
+ serializer Serializer
+ headerRead bool
+}
+
+//CallType call type
+type CallType int32
+
+const (
+ // CT_UNKNOWN unknown call type
+ CT_UNKNOWN CallType = 0
+ // CT_OneWay call one way
+ CT_OneWay CallType = 1
+ // CT_TwoWay call in request/response
+ CT_TwoWay CallType = 2
+)
+
+////////////////////////////////////////////
+// dubbo package
+////////////////////////////////////////////
+type SequenceType int64
+
+func (c *DubboCodec) ReadHeader(header *DubboHeader) error {
+ var err error
+ if c.reader.Size() < HEADER_LENGTH {
+ return hessian.ErrHeaderNotEnough
+ }
+ buf, err := c.reader.Peek(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return errors.WithStack(err)
+ }
+ _, err = c.reader.Discard(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return errors.WithStack(err)
+ }
+
+ //// read header
+ if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW {
+ return hessian.ErrIllegalPackage
+ }
+
+ // Header{serialization id(5 bit), event, two way, req/response}
+ if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero {
+ return errors.Errorf("serialization ID:%v", header.SerialID)
+ }
+
+ flag := buf[2] & FLAG_EVENT
+ if flag != Zero {
+ header.Type |= PackageHeartbeat
+ }
+ flag = buf[2] & FLAG_REQUEST
+ if flag != Zero {
+ header.Type |= PackageRequest
+ flag = buf[2] & FLAG_TWOWAY
+ if flag != Zero {
+ header.Type |= PackageRequest_TwoWay
+ }
+ } else {
+ header.Type |= PackageResponse
+ header.ResponseStatus = buf[3]
+ if header.ResponseStatus != Response_OK {
+ header.Type |= PackageResponse_Exception
+ }
+ }
+
+ // Header{req id}
+ header.ID = int64(binary.BigEndian.Uint64(buf[4:]))
+
+ // Header{body len}
+ header.BodyLen = int(binary.BigEndian.Uint32(buf[12:]))
+ if header.BodyLen < 0 {
+ return hessian.ErrIllegalPackage
+ }
+
+ c.pkgType = header.Type
+ c.bodyLen = header.BodyLen
+
+ if c.reader.Buffered() < c.bodyLen {
+ return hessian.ErrBodyNotEnough
+ }
+ c.headerRead = true
+ return errors.WithStack(err)
+}
+
+func (c *DubboCodec) EncodeHeader(p DubboPackage) []byte {
+ header := p.Header
+ bs := make([]byte, 0)
+ switch header.Type {
+ case PackageHeartbeat:
+ if header.ResponseStatus == Zero {
+ bs = append(bs, hessian.DubboRequestHeartbeatHeader[:]...)
+ } else {
+ bs = append(bs, hessian.DubboResponseHeartbeatHeader[:]...)
+ }
+ case PackageResponse:
+ bs = append(bs, hessian.DubboResponseHeaderBytes[:]...)
+ if header.ResponseStatus != 0 {
+ bs[3] = header.ResponseStatus
+ }
+ case PackageRequest_TwoWay:
+ bs = append(bs, hessian.DubboRequestHeaderBytesTwoWay[:]...)
+ }
+ bs[2] |= header.SerialID & hessian.SERIAL_MASK
+ binary.BigEndian.PutUint64(bs[4:], uint64(header.ID))
+ return bs
+}
+
+func (c *DubboCodec) Encode(p DubboPackage) ([]byte, error) {
+ // header
+ if c.serializer == nil {
+ return nil, errors.New("serializer should not be nil")
+ }
+ header := p.Header
+ switch header.Type {
+ case PackageHeartbeat:
+ if header.ResponseStatus == Zero {
+ return packRequest(p, c.serializer)
+ }
+ return packResponse(p, c.serializer)
+
+ case PackageRequest, PackageRequest_TwoWay:
+ return packRequest(p, c.serializer)
+
+ case PackageResponse:
+ return packResponse(p, c.serializer)
+
+ default:
+ return nil, errors.Errorf("Unrecognised message type: %v", header.Type)
+ }
+}
+
+func (c *DubboCodec) Decode(p *DubboPackage) error {
+ if !c.headerRead {
+ if err := c.ReadHeader(&p.Header); err != nil {
+ return err
+ }
+ }
+ body, err := c.reader.Peek(p.GetBodyLen())
+ if err != nil {
+ return err
+ }
+ if p.IsResponseWithException() {
+ logger.Infof("response with exception: %+v", p.Header)
+ decoder := hessian.NewDecoder(body)
+ exception, err := decoder.Decode()
+ if err != nil {
+ return errors.WithStack(err)
+ }
+ rsp, ok := p.Body.(*ResponsePayload)
+ if !ok {
+ return errors.Errorf("java exception:%s", exception.(string))
+ }
+ rsp.Exception = errors.Errorf("java exception:%s", exception.(string))
+ return nil
+ } else if p.IsHeartBeat() {
+ // heartbeat no need to unmarshal contents
+ return nil
+ }
+ if c.serializer == nil {
+ return errors.New("Codec serializer is nil")
+ }
+ return c.serializer.Unmarshal(body, p)
+}
+
+func (c *DubboCodec) SetSerializer(serializer Serializer) {
+ c.serializer = serializer
+}
+
+func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
+ var (
+ byteArray []byte
+ pkgLen int
+ )
+
+ header := p.Header
+
+ //////////////////////////////////////////
+ // byteArray
+ //////////////////////////////////////////
+ // magic
+ switch header.Type {
+ case PackageHeartbeat:
+ byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...)
+ case PackageRequest_TwoWay:
+ byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...)
+ default:
+ byteArray = append(byteArray, DubboRequestHeaderBytes[:]...)
+ }
+
+ // serialization id, two way flag, event, request/response flag
+ // SerialID is id of serialization approach in java dubbo
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ //////////////////////////////////////////
+ // body
+ //////////////////////////////////////////
+ if p.IsHeartBeat() {
+ byteArray = append(byteArray, byte('N'))
+ pkgLen = 1
+ } else {
+ body, err := serializer.Marshal(p)
+ if err != nil {
+ return nil, err
+ }
+ pkgLen = len(body)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+ }
+ byteArray = append(byteArray, body...)
+ }
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
+ return byteArray, nil
+}
+
+func packResponse(p DubboPackage, serializer Serializer) ([]byte, error) {
+ var (
+ byteArray []byte
+ )
+ header := p.Header
+ hb := p.IsHeartBeat()
+
+ // magic
+ if hb {
+ byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...)
+ } else {
+ byteArray = append(byteArray, DubboResponseHeaderBytes[:]...)
+ }
+ // set serialID, identify serialization types, eg: fastjson->6, hessian2->2
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // response status
+ if header.ResponseStatus != 0 {
+ byteArray[3] = header.ResponseStatus
+ }
+
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ // body
+ body, err := serializer.Marshal(p)
+ if err != nil {
+ return nil, err
+ }
+
+ pkgLen := len(body)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+ }
+ // byteArray{body length}
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen))
+ byteArray = append(byteArray, body...)
+ return byteArray, nil
+}
+
+func NewDubboCodec(reader *bufio.Reader) *DubboCodec {
+ return &DubboCodec{
+ reader: reader,
+ pkgType: 0,
+ bodyLen: 0,
+ headerRead: false,
+ }
+}
diff --git a/protocol/dubbo/impl/codec_test.go b/protocol/dubbo/impl/codec_test.go
new file mode 100644
index 0000000..c93f307
--- /dev/null
+++ b/protocol/dubbo/impl/codec_test.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/impl/proto"
+)
+
+func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
+ pkg := NewDubboPackage(nil)
+ pkg.Body = []interface{}{"a"}
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Hessian2
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(HessianSerializer{})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
+ pkgres.Body = []interface{}{}
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
+
+ // request
+ pkg.Header.Type = PackageRequest
+ pkg.Service.Interface = "Service"
+ pkg.Service.Path = "path"
+ pkg.Service.Version = "2.6"
+ pkg.Service.Method = "Method"
+ pkg.Service.Timeout = time.Second
+ data, err = pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+ pkgres.Body = make([]interface{}, 7)
+ err = pkgres.Unmarshal()
+ reassembleBody := pkgres.GetBody().(map[string]interface{})
+ assert.NoError(t, err)
+ assert.Equal(t, PackageRequest, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, "2.0.2", reassembleBody["dubboVersion"].(string))
+ assert.Equal(t, "path", pkgres.Service.Path)
+ assert.Equal(t, "2.6", pkgres.Service.Version)
+ assert.Equal(t, "Method", pkgres.Service.Method)
+ assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string))
+ assert.Equal(t, []interface{}{"a"}, reassembleBody["args"])
+ assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string))
+}
+
+func TestDubboPackage_Protobuf_Serialization_Request(t *testing.T) {
+ pkg := NewDubboPackage(nil)
+ pkg.Body = []interface{}{"a"}
+ pkg.Header.Type = PackageHeartbeat
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(HessianSerializer{})
+
+ pkgres.Body = []interface{}{}
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+ assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
+
+ // request
+ pkg.Header.Type = PackageRequest
+ pkg.Service.Interface = "Service"
+ pkg.Service.Path = "path"
+ pkg.Service.Version = "2.6"
+ pkg.Service.Method = "Method"
+ pkg.Service.Timeout = time.Second
+ pkg.SetBody([]interface{}{&pb.StringValue{Value: "hello world"}})
+ data, err = pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres = NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ body, ok := pkgres.Body.(map[string]interface{})
+ assert.Equal(t, ok, true)
+ req, ok := body["args"].([]interface{})
+ assert.Equal(t, ok, true)
+ // protobuf rpc just has exact one parameter
+ assert.Equal(t, len(req), 1)
+ argsBytes, ok := req[0].([]byte)
+ assert.Equal(t, ok, true)
+ sv := pb.StringValue{}
+ buf := proto.NewBuffer(argsBytes)
+ err = buf.Unmarshal(&sv)
+ assert.NoError(t, err)
+ assert.Equal(t, sv.Value, "hello world")
+}
+
+func TestDubboCodec_Protobuf_Serialization_Response(t *testing.T) {
+ {
+ pkg := NewDubboPackage(nil)
+ pkg.Header.Type = PackageResponse
+ pkg.Header.SerialID = constant.S_Proto
+ pkg.Header.ID = 10086
+ pkg.SetSerializer(ProtoSerializer{})
+ pkg.SetBody(&pb.StringValue{Value: "hello world"})
+
+ // heartbeat
+ data, err := pkg.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ pkgres.SetBody(&pb.StringValue{})
+ err = pkgres.Unmarshal()
+
+ assert.NoError(t, err)
+ assert.Equal(t, pkgres.Header.Type, PackageResponse)
+ assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID)
+ assert.Equal(t, int64(10086), pkgres.Header.ID)
+
+ res, ok := pkgres.Body.(*pb.StringValue)
+ assert.Equal(t, ok, true)
+ assert.Equal(t, res.Value, "hello world")
+ }
+
+ // with attachments
+ {
+ attas := make(map[string]string)
+ attas["k1"] = "test"
+ resp := NewResponsePayload(&pb.StringValue{Value: "attachments"}, nil, attas)
+ p := NewDubboPackage(nil)
+ p.Header.Type = PackageResponse
+ p.Header.SerialID = constant.S_Proto
+ p.SetSerializer(ProtoSerializer{})
+ p.SetBody(resp)
+ data, err := p.Marshal()
+ assert.NoError(t, err)
+
+ pkgres := NewDubboPackage(data)
+ pkgres.Header.Type = PackageResponse
+ pkgres.Header.SerialID = constant.S_Proto
+ pkgres.Header.ID = 10086
+ pkgres.SetSerializer(ProtoSerializer{})
+
+ resAttachment := make(map[string]string)
+ resBody := &pb.StringValue{}
+ pkgres.SetBody(NewResponsePayload(resBody, nil, resAttachment))
+
+ err = pkgres.Unmarshal()
+ assert.NoError(t, err)
+ assert.Equal(t, "attachments", resBody.Value)
+ assert.Equal(t, "test", resAttachment["k1"])
+ }
+
+}
diff --git a/protocol/dubbo/impl/const.go b/protocol/dubbo/impl/const.go
new file mode 100644
index 0000000..06e73fa
--- /dev/null
+++ b/protocol/dubbo/impl/const.go
@@ -0,0 +1,243 @@
+package impl
+
+import (
+ "reflect"
+ "regexp"
+
+ "github.com/pkg/errors"
+)
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const (
+ DUBBO = "dubbo"
+)
+
+const (
+ mask = byte(127)
+ flag = byte(128)
+)
+
+const (
+ // Zero : byte zero
+ Zero = byte(0x00)
+)
+
+// constansts
+const (
+ TAG_READ = int32(-1)
+ ASCII_GAP = 32
+ CHUNK_SIZE = 4096
+ BC_BINARY = byte('B') // final chunk
+ BC_BINARY_CHUNK = byte('A') // non-final chunk
+
+ BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary
+ BINARY_DIRECT_MAX = byte(0x0f)
+ BC_BINARY_SHORT = byte(0x34) // 2-byte length binary
+ BINARY_SHORT_MAX = 0x3ff // 0-1023 binary
+
+ BC_DATE = byte(0x4a) // 64-bit millisecond UTC date
+ BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date
+
+ BC_DOUBLE = byte('D') // IEEE 64-bit double
+
+ BC_DOUBLE_ZERO = byte(0x5b)
+ BC_DOUBLE_ONE = byte(0x5c)
+ BC_DOUBLE_BYTE = byte(0x5d)
+ BC_DOUBLE_SHORT = byte(0x5e)
+ BC_DOUBLE_MILL = byte(0x5f)
+
+ BC_FALSE = byte('F') // boolean false
+
+ BC_INT = byte('I') // 32-bit int
+
+ INT_DIRECT_MIN = -0x10
+ INT_DIRECT_MAX = byte(0x2f)
+ BC_INT_ZERO = byte(0x90)
+
+ INT_BYTE_MIN = -0x800
+ INT_BYTE_MAX = 0x7ff
+ BC_INT_BYTE_ZERO = byte(0xc8)
+
+ BC_END = byte('Z')
+
+ INT_SHORT_MIN = -0x40000
+ INT_SHORT_MAX = 0x3ffff
+ BC_INT_SHORT_ZERO = byte(0xd4)
+
+ BC_LIST_VARIABLE = byte(0x55)
+ BC_LIST_FIXED = byte('V')
+ BC_LIST_VARIABLE_UNTYPED = byte(0x57)
+ BC_LIST_FIXED_UNTYPED = byte(0x58)
+ _listFixedTypedLenTagMin = byte(0x70)
+ _listFixedTypedLenTagMax = byte(0x77)
+ _listFixedUntypedLenTagMin = byte(0x78)
+ _listFixedUntypedLenTagMax = byte(0x7f)
+
+ BC_LIST_DIRECT = byte(0x70)
+ BC_LIST_DIRECT_UNTYPED = byte(0x78)
+ LIST_DIRECT_MAX = byte(0x7)
+
+ BC_LONG = byte('L') // 64-bit signed integer
+ LONG_DIRECT_MIN = -0x08
+ LONG_DIRECT_MAX = byte(0x0f)
+ BC_LONG_ZERO = byte(0xe0)
+
+ LONG_BYTE_MIN = -0x800
+ LONG_BYTE_MAX = 0x7ff
+ BC_LONG_BYTE_ZERO = byte(0xf8)
+
+ LONG_SHORT_MIN = -0x40000
+ LONG_SHORT_MAX = 0x3ffff
+ BC_LONG_SHORT_ZERO = byte(0x3c)
+
+ BC_LONG_INT = byte(0x59)
+
+ BC_MAP = byte('M')
+ BC_MAP_UNTYPED = byte('H')
+
+ BC_NULL = byte('N') // x4e
+
+ BC_OBJECT = byte('O')
+ BC_OBJECT_DEF = byte('C')
+
+ BC_OBJECT_DIRECT = byte(0x60)
+ OBJECT_DIRECT_MAX = byte(0x0f)
+
+ BC_REF = byte(0x51)
+
+ BC_STRING = byte('S') // final string
+ BC_STRING_CHUNK = byte('R') // non-final string
+
+ BC_STRING_DIRECT = byte(0x00)
+ STRING_DIRECT_MAX = byte(0x1f)
+ BC_STRING_SHORT = byte(0x30)
+ STRING_SHORT_MAX = 0x3ff
+
+ BC_TRUE = byte('T')
+
+ P_PACKET_CHUNK = byte(0x4f)
+ P_PACKET = byte('P')
+
+ P_PACKET_DIRECT = byte(0x80)
+ PACKET_DIRECT_MAX = byte(0x7f)
+
+ P_PACKET_SHORT = byte(0x70)
+ PACKET_SHORT_MAX = 0xfff
+ ARRAY_STRING = "[string"
+ ARRAY_INT = "[int"
+ ARRAY_DOUBLE = "[double"
+ ARRAY_FLOAT = "[float"
+ ARRAY_BOOL = "[boolean"
+ ARRAY_LONG = "[long"
+
+ PATH_KEY = "path"
+ GROUP_KEY = "group"
+ INTERFACE_KEY = "interface"
+ VERSION_KEY = "version"
+ TIMEOUT_KEY = "timeout"
+
+ STRING_NIL = ""
+ STRING_TRUE = "true"
+ STRING_FALSE = "false"
+ STRING_ZERO = "0.0"
+ STRING_ONE = "1.0"
+)
+
+// ResponsePayload related consts
+const (
+ Response_OK byte = 20
+ Response_CLIENT_TIMEOUT byte = 30
+ Response_SERVER_TIMEOUT byte = 31
+ Response_BAD_REQUEST byte = 40
+ Response_BAD_RESPONSE byte = 50
+ Response_SERVICE_NOT_FOUND byte = 60
+ Response_SERVICE_ERROR byte = 70
+ Response_SERVER_ERROR byte = 80
+ Response_CLIENT_ERROR byte = 90
+
+ // According to "java dubbo" There are two cases of response:
+ // 1. with attachments
+ // 2. no attachments
+ RESPONSE_WITH_EXCEPTION int32 = 0
+ RESPONSE_VALUE int32 = 1
+ RESPONSE_NULL_VALUE int32 = 2
+ RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3
+ RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4
+ RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5
+)
+
+/**
+ * the dubbo protocol header length is 16 Bytes.
+ * the first 2 Bytes is magic code '0xdabb'
+ * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag
+ * the next 1 Bytes is response state.
+ * the next 8 Bytes is package DI.
+ * the next 4 Bytes is package length.
+ **/
+const (
+ // header length.
+ HEADER_LENGTH = 16
+
+ // magic header
+ MAGIC = uint16(0xdabb)
+ MAGIC_HIGH = byte(0xda)
+ MAGIC_LOW = byte(0xbb)
+
+ // message flag.
+ FLAG_REQUEST = byte(0x80)
+ FLAG_TWOWAY = byte(0x40)
+ FLAG_EVENT = byte(0x20) // for heartbeat
+ SERIAL_MASK = 0x1f
+
+ DUBBO_VERSION = "2.5.4"
+ DUBBO_VERSION_KEY = "dubbo"
+ DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
+ LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200
+ DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length
+)
+
+// regular
+const (
+ JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)"
+ CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)"
+ ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))"
+ DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")"
+)
+
+// Dubbo request response related consts
+var (
+ DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY}
+ DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST}
+ DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK}
+ DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT}
+ DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT}
+)
+
+// Error part
+var (
+ ErrHeaderNotEnough = errors.New("header buffer too short")
+ ErrBodyNotEnough = errors.New("body buffer too short")
+ ErrJavaException = errors.New("got java exception")
+ ErrIllegalPackage = errors.New("illegal package!")
+)
+
+// DescRegex ...
+var DescRegex, _ = regexp.Compile(DESC_REGEX)
+
+var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem())
diff --git a/protocol/dubbo/impl/hessian.go b/protocol/dubbo/impl/hessian.go
new file mode 100644
index 0000000..513421b
--- /dev/null
+++ b/protocol/dubbo/impl/hessian.go
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "math"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/apache/dubbo-go-hessian2/java_exception"
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/logger"
+)
+
+type Object interface{}
+
+type HessianSerializer struct {
+}
+
+func (h HessianSerializer) Marshal(p DubboPackage) ([]byte, error) {
+ encoder := hessian.NewEncoder()
+ if p.IsRequest() {
+ return marshalRequest(encoder, p)
+ }
+ return marshalResponse(encoder, p)
+}
+
+func (h HessianSerializer) Unmarshal(input []byte, p *DubboPackage) error {
+ if p.IsHeartBeat() {
+ return nil
+ }
+ if p.IsRequest() {
+ return unmarshalRequestBody(input, p)
+ }
+ return unmarshalResponseBody(input, p)
+}
+
+func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
+ header := p.Header
+ response := EnsureResponsePayload(p.Body)
+ if header.ResponseStatus == Response_OK {
+ if p.IsHeartBeat() {
+ encoder.Encode(nil)
+ } else {
+ atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY])
+
+ var resWithException, resValue, resNullValue int32
+ if atta {
+ resWithException = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ resValue = RESPONSE_VALUE_WITH_ATTACHMENTS
+ resNullValue = RESPONSE_NULL_VALUE_WITH_ATTACHMENTS
+ } else {
+ resWithException = RESPONSE_WITH_EXCEPTION
+ resValue = RESPONSE_VALUE
+ resNullValue = RESPONSE_NULL_VALUE
+ }
+
+ if response.Exception != nil { // throw error
+ encoder.Encode(resWithException)
+ if t, ok := response.Exception.(java_exception.Throwabler); ok {
+ encoder.Encode(t)
+ } else {
+ encoder.Encode(java_exception.NewThrowable(response.Exception.Error()))
+ }
+ } else {
+ if response.RspObj == nil {
+ encoder.Encode(resNullValue)
+ } else {
+ encoder.Encode(resValue)
+ encoder.Encode(response.RspObj) // result
+ }
+ }
+
+ if atta {
+ encoder.Encode(response.Attachments) // attachments
+ }
+ }
+ } else {
+ if response.Exception != nil { // throw error
+ encoder.Encode(response.Exception.Error())
+ } else {
+ encoder.Encode(response.RspObj)
+ }
+ }
+ bs := encoder.Buffer()
+ // encNull
+ bs = append(bs, byte('N'))
+ return bs, nil
+}
+
+func marshalRequest(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) {
+ service := p.Service
+ request := EnsureRequestPayload(p.Body)
+ encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION)
+ encoder.Encode(service.Path)
+ encoder.Encode(service.Version)
+ encoder.Encode(service.Method)
+
+ args, ok := request.Params.([]interface{})
+
+ if !ok {
+ logger.Infof("request args are: %+v", request.Params)
+ return nil, perrors.Errorf("@params is not of type: []interface{}")
+ }
+ types, err := getArgsTypeList(args)
+ if err != nil {
+ return nil, perrors.Wrapf(err, " PackRequest(args:%+v)", args)
+ }
+ encoder.Encode(types)
+ for _, v := range args {
+ encoder.Encode(v)
+ }
+
+ request.Attachments[PATH_KEY] = service.Path
+ request.Attachments[VERSION_KEY] = service.Version
+ if len(service.Group) > 0 {
+ request.Attachments[GROUP_KEY] = service.Group
+ }
+ if len(service.Interface) > 0 {
+ request.Attachments[INTERFACE_KEY] = service.Interface
+ }
+ if service.Timeout != 0 {
+ request.Attachments[TIMEOUT_KEY] = strconv.Itoa(int(service.Timeout / time.Millisecond))
+ }
+
+ encoder.Encode(request.Attachments)
+ return encoder.Buffer(), nil
+
+}
+
+var versionInt = make(map[string]int)
+
+// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96
+// isSupportResponseAttachment is for compatibility among some dubbo version
+func isSupportResponseAttachment(version string) bool {
+ if version == "" {
+ return false
+ }
+
+ v, ok := versionInt[version]
+ if !ok {
+ v = version2Int(version)
+ if v == -1 {
+ return false
+ }
+ }
+
+ if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2
+ return false
+ }
+ return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT
+}
+
+func version2Int(version string) int {
+ var v = 0
+ varr := strings.Split(version, ".")
+ length := len(varr)
+ for key, value := range varr {
+ v0, err := strconv.Atoi(value)
+ if err != nil {
+ return -1
+ }
+ v += v0 * int(math.Pow10((length-key-1)*2))
+ }
+ if length == 3 {
+ return v * 100
+ }
+ return v
+}
+
+func unmarshalRequestBody(body []byte, p *DubboPackage) error {
+ if p.Body == nil {
+ p.SetBody(make([]interface{}, 7))
+ }
+ decoder := hessian.NewDecoder(body)
+ var (
+ err error
+ dubboVersion, target, serviceVersion, method, argsTypes interface{}
+ args []interface{}
+ )
+ req, ok := p.Body.([]interface{})
+ if !ok {
+ return perrors.Errorf("@reqObj is not of type: []interface{}")
+ }
+ dubboVersion, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[0] = dubboVersion
+
+ target, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[1] = target
+
+ serviceVersion, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[2] = serviceVersion
+
+ method, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[3] = method
+
+ argsTypes, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[4] = argsTypes
+
+ ats := hessian.DescRegex.FindAllString(argsTypes.(string), -1)
+ var arg interface{}
+ for i := 0; i < len(ats); i++ {
+ arg, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ args = append(args, arg)
+ }
+ req[5] = args
+
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ v[DUBBO_VERSION_KEY] = dubboVersion
+ req[6] = hessian.ToMapStringString(v)
+ buildServerSidePackageBody(p)
+ return nil
+ }
+ return perrors.Errorf("get wrong attachments: %+v", attachments)
+}
+
+func unmarshalResponseBody(body []byte, p *DubboPackage) error {
+ decoder := hessian.NewDecoder(body)
+ rspType, err := decoder.Decode()
+ if p.Body == nil {
+ p.SetBody(&ResponsePayload{})
+ }
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ response := EnsureResponsePayload(p.Body)
+
+ switch rspType {
+ case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ expt, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return perrors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+
+ if e, ok := expt.(error); ok {
+ response.Exception = e
+ } else {
+ response.Exception = perrors.Errorf("got exception: %+v", expt)
+ }
+ return nil
+
+ case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS:
+ rsp, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return perrors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+
+ return perrors.WithStack(hessian.ReflectResponse(rsp, response.RspObj))
+
+ case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
+ if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ atta := hessian.ToMapStringString(v)
+ response.Attachments = atta
+ } else {
+ return perrors.Errorf("get wrong attachments: %+v", attachments)
+ }
+ }
+ return nil
+ }
+ return nil
+}
+
+func buildServerSidePackageBody(pkg *DubboPackage) {
+ req := pkg.GetBody().([]interface{}) // length of body should be 7
+ if len(req) > 0 {
+ var dubboVersion, argsTypes string
+ var args []interface{}
+ var attachments map[string]string
+ svc := Service{}
+ if req[0] != nil {
+ dubboVersion = req[0].(string)
+ }
+ if req[1] != nil {
+ svc.Path = req[1].(string)
+ }
+ if req[2] != nil {
+ svc.Version = req[2].(string)
+ }
+ if req[3] != nil {
+ svc.Method = req[3].(string)
+ }
+ if req[4] != nil {
+ argsTypes = req[4].(string)
+ }
+ if req[5] != nil {
+ args = req[5].([]interface{})
+ }
+ if req[6] != nil {
+ attachments = req[6].(map[string]string)
+ }
+ if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
+ svc.Path = attachments[constant.PATH_KEY]
+ }
+ if _, ok := attachments[constant.INTERFACE_KEY]; ok {
+ svc.Interface = attachments[constant.INTERFACE_KEY]
+ } else {
+ svc.Interface = svc.Path
+ }
+ if len(attachments[constant.GROUP_KEY]) > 0 {
+ svc.Group = attachments[constant.GROUP_KEY]
+ }
+ pkg.SetService(svc)
+ pkg.SetBody(map[string]interface{}{
+ "dubboVersion": dubboVersion,
+ "argsTypes": argsTypes,
+ "args": args,
+ "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
+ "attachments": attachments,
+ })
+ }
+}
+
+func getArgsTypeList(args []interface{}) (string, error) {
+ var (
+ typ string
+ types string
+ )
+
+ for i := range args {
+ typ = getArgType(args[i])
+ if typ == "" {
+ return types, perrors.Errorf("cat not get arg %#v type", args[i])
+ }
+ if !strings.Contains(typ, ".") {
+ types += typ
+ } else if strings.Index(typ, "[") == 0 {
+ types += strings.Replace(typ, ".", "/", -1)
+ } else {
+ // java.util.List -> Ljava/util/List;
+ types += "L" + strings.Replace(typ, ".", "/", -1) + ";"
+ }
+ }
+
+ return types, nil
+}
+
+func getArgType(v interface{}) string {
+ if v == nil {
+ return "V"
+ }
+
+ switch v.(type) {
+ // Serialized tags for base types
+ case nil:
+ return "V"
+ case bool:
+ return "Z"
+ case []bool:
+ return "[Z"
+ case byte:
+ return "B"
+ case []byte:
+ return "[B"
+ case int8:
+ return "B"
+ case []int8:
+ return "[B"
+ case int16:
+ return "S"
+ case []int16:
+ return "[S"
+ case uint16: // Equivalent to Char of Java
+ return "C"
+ case []uint16:
+ return "[C"
+ // case rune:
+ // return "C"
+ case int:
+ return "J"
+ case []int:
+ return "[J"
+ case int32:
+ return "I"
+ case []int32:
+ return "[I"
+ case int64:
+ return "J"
+ case []int64:
+ return "[J"
+ case time.Time:
+ return "java.util.Date"
+ case []time.Time:
+ return "[Ljava.util.Date"
+ case float32:
+ return "F"
+ case []float32:
+ return "[F"
+ case float64:
+ return "D"
+ case []float64:
+ return "[D"
+ case string:
+ return "java.lang.String"
+ case []string:
+ return "[Ljava.lang.String;"
+ case []Object:
+ return "[Ljava.lang.Object;"
+ case map[interface{}]interface{}:
+ // return "java.util.HashMap"
+ return "java.util.Map"
+ case hessian.POJOEnum:
+ return v.(hessian.POJOEnum).JavaClassName()
+ // Serialized tags for complex types
+ default:
+ t := reflect.TypeOf(v)
+ if reflect.Ptr == t.Kind() {
+ t = reflect.TypeOf(reflect.ValueOf(v).Elem())
+ }
+ switch t.Kind() {
+ case reflect.Struct:
+ return "java.lang.Object"
+ case reflect.Slice, reflect.Array:
+ if t.Elem().Kind() == reflect.Struct {
+ return "[Ljava.lang.Object;"
+ }
+ // return "java.util.ArrayList"
+ return "java.util.List"
+ case reflect.Map: // Enter here, map may be map[string]int
+ return "java.util.Map"
+ default:
+ return ""
+ }
+ }
+
+ // unreachable
+ // return "java.lang.RuntimeException"
+}
+
+func init() {
+ SetSerializer("hessian2", HessianSerializer{})
+}
diff --git a/protocol/dubbo/impl/package.go b/protocol/dubbo/impl/package.go
new file mode 100644
index 0000000..d40c4af
--- /dev/null
+++ b/protocol/dubbo/impl/package.go
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "time"
+)
+
+import (
+ "github.com/pkg/errors"
+)
+
+type PackageType int
+
+// enum part
+const (
+ PackageError = PackageType(0x01)
+ PackageRequest = PackageType(0x02)
+ PackageResponse = PackageType(0x04)
+ PackageHeartbeat = PackageType(0x08)
+ PackageRequest_TwoWay = PackageType(0x10)
+ PackageResponse_Exception = PackageType(0x20)
+ PackageType_BitSize = 0x2f
+)
+
+type DubboHeader struct {
+ SerialID byte
+ Type PackageType
+ ID int64
+ BodyLen int
+ ResponseStatus byte
+}
+
+// Service defines service instance
+type Service struct {
+ Path string
+ Interface string
+ Group string
+ Version string
+ Method string
+ Timeout time.Duration // request timeout
+}
+
+type DubboPackage struct {
+ Header DubboHeader
+ Service Service
+ Body interface{}
+ Err error
+ Codec *DubboCodec
+}
+
+func (p DubboPackage) String() string {
+ return fmt.Sprintf("HessianPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
+}
+
+func (p *DubboPackage) ReadHeader() error {
+ return p.Codec.ReadHeader(&p.Header)
+}
+
+func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
+ if p.Codec == nil {
+ return nil, errors.New("Codec is nil")
+ }
+ pkg, err := p.Codec.Encode(*p)
+ if err != nil {
+ return nil, errors.WithStack(err)
+ }
+ return bytes.NewBuffer(pkg), nil
+}
+
+func (p *DubboPackage) Unmarshal() error {
+ if p.Codec == nil {
+ return errors.New("Codec is nil")
+ }
+ return p.Codec.Decode(p)
+}
+
+func (p DubboPackage) IsHeartBeat() bool {
+ return p.Header.Type&PackageHeartbeat != 0
+}
+
+func (p DubboPackage) IsRequest() bool {
+ return p.Header.Type&(PackageRequest_TwoWay|PackageRequest) != 0
+}
+
+func (p DubboPackage) IsResponse() bool {
+ return p.Header.Type == PackageResponse
+}
+
+func (p DubboPackage) IsResponseWithException() bool {
+ flag := PackageResponse | PackageResponse_Exception
+ return p.Header.Type&flag == flag
+}
+
+func (p DubboPackage) GetBodyLen() int {
+ return p.Header.BodyLen
+}
+
+func (p DubboPackage) GetLen() int {
+ return HEADER_LENGTH + p.Header.BodyLen
+}
+
+func (p DubboPackage) GetBody() interface{} {
+ return p.Body
+}
+
+func (p *DubboPackage) SetBody(body interface{}) {
+ p.Body = body
+}
+
+func (p *DubboPackage) SetHeader(header DubboHeader) {
+ p.Header = header
+}
+
+func (p *DubboPackage) SetService(svc Service) {
+ p.Service = svc
+}
+
+func (p *DubboPackage) SetID(id int64) {
+ p.Header.ID = id
+}
+
+func (p DubboPackage) GetHeader() DubboHeader {
+ return p.Header
+}
+
+func (p DubboPackage) GetService() Service {
+ return p.Service
+}
+
+func (p *DubboPackage) SetResponseStatus(status byte) {
+ p.Header.ResponseStatus = status
+}
+
+func (p *DubboPackage) SetSerializer(serializer Serializer) {
+ p.Codec.SetSerializer(serializer)
+}
+
+func NewDubboPackage(data *bytes.Buffer) *DubboPackage {
+ var codec *DubboCodec
+ if data == nil {
+ codec = NewDubboCodec(nil)
+ } else {
+ codec = NewDubboCodec(bufio.NewReaderSize(data, len(data.Bytes())))
+ }
+ return &DubboPackage{
+ Header: DubboHeader{},
+ Service: Service{},
+ Body: nil,
+ Err: nil,
+ Codec: codec,
+ }
+}
diff --git a/protocol/dubbo/impl/proto.go b/protocol/dubbo/impl/proto.go
new file mode 100644
index 0000000..ea1c55d
--- /dev/null
+++ b/protocol/dubbo/impl/proto.go
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/matttproud/golang_protobuf_extensions/pbutil"
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ pb "github.com/apache/dubbo-go/protocol/dubbo/impl/proto"
+)
+
+type ProtoSerializer struct{}
+
+func (p ProtoSerializer) Marshal(pkg DubboPackage) ([]byte, error) {
+ if pkg.IsHeartBeat() {
+ return []byte{byte('N')}, nil
+ }
+ if pkg.Body == nil {
+ return nil, errors.New("package body should not be nil")
+ }
+ if pkg.IsRequest() {
+ return marshalRequestProto(pkg)
+ }
+ return marshalResponseProto(pkg)
+}
+
+func (p ProtoSerializer) Unmarshal(data []byte, pkg *DubboPackage) error {
+ if pkg.IsRequest() {
+ return unmarshalRequestProto(data, pkg)
+ }
+ return unmarshalResponseProto(data, pkg)
+}
+
+func unmarshalResponseProto(data []byte, pkg *DubboPackage) error {
+ if pkg.Body == nil {
+ pkg.SetBody(NewResponsePayload(nil, nil, nil))
+ }
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(data)
+
+ var responseType int32
+ if err := readByte(buf, &responseType); err != nil {
+ return err
+ }
+
+ hasAttachments := false
+ hasException := false
+ switch responseType {
+ case RESPONSE_VALUE_WITH_ATTACHMENTS:
+ hasAttachments = true
+ case RESPONSE_WITH_EXCEPTION:
+ hasException = true
+ case RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ hasAttachments = true
+ hasException = true
+ }
+ if hasException {
+ throwable := pb.ThrowableProto{}
+ if err := readObject(buf, &throwable); err != nil {
+ return err
+ }
+ // generate error only with error message
+ response.Exception = errors.New(throwable.OriginalMessage)
+ } else {
+ // read response body
+ protoMsg, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return errors.New("response rspobj not protobuf message")
+ }
+ if err := readObject(buf, protoMsg); err != nil {
+ return err
+ }
+ }
+
+ if hasAttachments {
+ atta := pb.Map{}
+ if err := readObject(buf, &atta); err != nil {
+ return err
+ }
+ if response.Attachments == nil {
+ response.Attachments = atta.Attachments
+ } else {
+ for k, v := range atta.Attachments {
+ response.Attachments[k] = v
+ }
+ }
+
+ }
+ return nil
+}
+
+func unmarshalRequestProto(data []byte, pkg *DubboPackage) error {
+ var dubboVersion string
+ var svcPath string
+ var svcVersion string
+ var svcMethod string
+ buf := bytes.NewBuffer(data)
+ if err := readUTF(buf, &dubboVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcPath); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcVersion); err != nil {
+ return err
+ }
+ if err := readUTF(buf, &svcMethod); err != nil {
+ return err
+ }
+ // NOTE: protobuf rpc just have exact one parameter, while golang doesn't need this field
+ var argsType string
+ if err := readUTF(buf, &argsType); err != nil {
+ return err
+ }
+ // get raw body bytes for proxy methods to unmarshal
+ var protoMsgLength int
+ if err := readDelimitedLength(buf, &protoMsgLength); err != nil {
+ return err
+ }
+ argBytes := make([]byte, protoMsgLength)
+ if n, err := buf.Read(argBytes); err != nil {
+ if n != protoMsgLength {
+ return errors.New("illegal msg length")
+ }
+ return err
+ }
+ arg := getRegisterMessage(argsType)
+ err := proto.Unmarshal(argBytes, arg.Interface().(JavaProto))
+ if err != nil {
+ panic(err)
+ }
+
+ m := &pb.Map{}
+ if err := readObject(buf, m); err != nil {
+ return err
+ }
+ svc := Service{}
+ svc.Version = svcVersion
+ svc.Method = svcMethod
+ // just as hessian
+ svc.Path = svcPath
+ if svc.Path == "" && len(m.Attachments[constant.PATH_KEY]) > 0 {
+ svc.Path = m.Attachments[constant.PATH_KEY]
+ }
+
+ if _, ok := m.Attachments[constant.INTERFACE_KEY]; ok {
+ svc.Interface = m.Attachments[constant.INTERFACE_KEY]
+ } else {
+ svc.Interface = svc.Path
+ }
+ pkg.SetService(svc)
+ pkg.SetBody(map[string]interface{}{
+ "dubboVersion": dubboVersion,
+ "args": []interface{}{arg.Interface()},
+ "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key
+ "attachments": m.Attachments,
+ })
+
+ return nil
+}
+
+func marshalRequestProto(pkg DubboPackage) ([]byte, error) {
+ request := EnsureRequestPayload(pkg.Body)
+ args, ok := request.Params.([]interface{})
+ buf := bytes.NewBuffer(make([]byte, 0))
+ if !ok {
+ return nil, errors.New("proto buffer args should be marshaled in []byte")
+ }
+ // NOTE: protobuf rpc just has exact one parameter
+ if len(args) != 1 {
+ return nil, errors.New("illegal protobuf service, len(arg) should equal 1")
+ }
+ // dubbo version
+ if err := writeUTF(buf, DUBBO_VERSION); err != nil {
+ return nil, err
+ }
+ // service path
+ if err := writeUTF(buf, pkg.Service.Path); err != nil {
+ return nil, err
+ }
+ // service version
+ if err := writeUTF(buf, pkg.Service.Version); err != nil {
+ return nil, err
+ }
+ // service method
+ if err := writeUTF(buf, pkg.Service.Method); err != nil {
+ return nil, err
+ }
+ // parameter types desc
+ v := reflect.ValueOf(args[0])
+ mv := v.MethodByName("JavaClassName")
+ if mv.IsValid() {
+ javaCls := mv.Call([]reflect.Value{})
+ if len(javaCls) != 1 {
+ return nil, errors.New("JavaStringName method should return exact 1 result")
+ }
+ javaClsStr, ok := javaCls[0].Interface().(string)
+ if !ok {
+ return nil, errors.New("JavaClassName method should return string")
+ }
+ if err := writeUTF(buf, getJavaArgType(javaClsStr)); err != nil {
+ return nil, err
+ }
+ } else {
+ // defensive code
+ if err := writeUTF(buf, ""); err != nil {
+ return nil, err
+ }
+ }
+ // consumer args
+ protoMsg := args[0].(proto.Message)
+ if err := writeObject(buf, protoMsg); err != nil {
+ return nil, err
+ }
+ // attachments
+ atta := make(map[string]string)
+ atta[PATH_KEY] = pkg.Service.Path
+ atta[VERSION_KEY] = pkg.Service.Version
+ if len(pkg.Service.Group) > 0 {
+ atta[GROUP_KEY] = pkg.Service.Group
+ }
+ if len(pkg.Service.Interface) > 0 {
+ atta[INTERFACE_KEY] = pkg.Service.Interface
+ }
+ if pkg.Service.Timeout != 0 {
+ atta[TIMEOUT_KEY] = strconv.Itoa(int(pkg.Service.Timeout / time.Millisecond))
+ }
+ m := pb.Map{Attachments: atta}
+ if err := writeObject(buf, &m); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func marshalResponseProto(pkg DubboPackage) ([]byte, error) {
+ response := EnsureResponsePayload(pkg.Body)
+ buf := bytes.NewBuffer(make([]byte, 0))
+ responseType := RESPONSE_VALUE
+ hasAttachments := false
+ if response.Attachments != nil {
+ responseType = RESPONSE_VALUE_WITH_ATTACHMENTS
+ hasAttachments = true
+ } else {
+ responseType = RESPONSE_VALUE
+ }
+ if response.Exception != nil {
+ if hasAttachments {
+ responseType = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ } else {
+ responseType = RESPONSE_WITH_EXCEPTION
+ }
+ }
+ // write response type
+ if err := writeByte(buf, responseType); err != nil {
+ return nil, err
+ }
+ if response.Exception != nil {
+ // deal with exception
+ throwable := pb.ThrowableProto{OriginalMessage: response.Exception.Error()}
+ if err := writeObject(buf, &throwable); err != nil {
+ return nil, err
+ }
+ } else {
+ res, ok := response.RspObj.(proto.Message)
+ if !ok {
+ return nil, errors.New("proto buffer params should be marshaled in proto.Message")
+ }
+ // response body
+ if err := writeObject(buf, res); err != nil {
+ return nil, err
+ }
+ }
+
+ if hasAttachments {
+ attachments := pb.Map{Attachments: response.Attachments}
+ if err := writeObject(buf, &attachments); err != nil {
+ return nil, err
+ }
+ }
+ return buf.Bytes(), nil
+}
+
+func init() {
+ SetSerializer("protobuf", ProtoSerializer{})
+}
+
+func getJavaArgType(javaClsName string) string {
+ return fmt.Sprintf("L%s;", strings.ReplaceAll(javaClsName, ".", "/"))
+}
+
+func writeUTF(writer io.Writer, value string) error {
+ _, err := pbutil.WriteDelimited(writer, &pb.StringValue{Value: value})
+ return err
+}
+
+func writeObject(writer io.Writer, value proto.Message) error {
+ _, err := pbutil.WriteDelimited(writer, value)
+ return err
+}
+
+func writeByte(writer io.Writer, v int32) error {
+ i32v := &pb.Int32Value{Value: v}
+ _, err := pbutil.WriteDelimited(writer, i32v)
+ return err
+}
+
+func readUTF(reader io.Reader, value *string) error {
+ sv := &pb.StringValue{}
+ _, err := pbutil.ReadDelimited(reader, sv)
+ if err != nil {
+ return err
+ }
+ *value = sv.Value
+ return nil
+}
+
+func readObject(reader io.Reader, value proto.Message) error {
+ _, err := pbutil.ReadDelimited(reader, value)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// just as java protobuf serialize
+func readByte(reader io.Reader, value *int32) error {
+ i32v := &pb.Int32Value{}
+ _, err := pbutil.ReadDelimited(reader, i32v)
+ if err != nil {
+ return err
+ }
+ *value = i32v.Value
+ return nil
+}
+
+//
+func readDelimitedLength(reader io.Reader, length *int) error {
+ var headerBuf [binary.MaxVarintLen32]byte
+ var bytesRead, varIntBytes int
+ var messageLength uint64
+ for varIntBytes == 0 { // i.e. no varint has been decoded yet.
+ if bytesRead >= len(headerBuf) {
+ return errors.New("invalid varint32 encountered")
+ }
+ // We have to read byte by byte here to avoid reading more bytes
+ // than required. Each read byte is appended to what we have
+ // read before.
+ newBytesRead, err := reader.Read(headerBuf[bytesRead : bytesRead+1])
+ if newBytesRead == 0 {
+ if err != nil {
+ return err
+ }
+ // A Reader should not return (0, nil), but if it does,
+ // it should be treated as no-op (according to the
+ // Reader contract). So let's go on...
+ continue
+ }
+ bytesRead += newBytesRead
+ // Now present everything read so far to the varint decoder and
+ // see if a varint can be decoded already.
+ messageLength, varIntBytes = proto.DecodeVarint(headerBuf[:bytesRead])
+ }
+ *length = int(messageLength)
+ return nil
+}
+
+type JavaProto interface {
+ JavaClassName() string
+ proto.Message
+}
+
+type Register struct {
+ sync.RWMutex
+ registry map[string]reflect.Type
+}
+
+var (
+ register = Register{
+ registry: make(map[string]reflect.Type),
+ }
+)
+
+func RegisterMessage(msg JavaProto) {
+ register.Lock()
+ defer register.Unlock()
+
+ name := msg.JavaClassName()
+ name = getJavaArgType(name)
+
+ if e, ok := register.registry[name]; ok {
+ panic(fmt.Sprintf("msg: %v has been registered. existed: %v", msg.JavaClassName(), e))
+ }
+
+ register.registry[name] = typeOfMessage(msg)
+}
+
+func getRegisterMessage(sig string) reflect.Value {
+ register.Lock()
+ defer register.Unlock()
+
+ t, ok := register.registry[sig]
+ if !ok {
+ panic(fmt.Sprintf("registry dose not have for svc: %v", sig))
+ }
+ return reflect.New(t)
+}
+
+func typeOfMessage(o proto.Message) reflect.Type {
+ v := reflect.ValueOf(o)
+ switch v.Kind() {
+ case reflect.Struct:
+ return v.Type()
+ case reflect.Ptr:
+ return v.Elem().Type()
+ }
+
+ return reflect.TypeOf(o)
+}
diff --git a/protocol/dubbo/impl/proto/payload.pb.go b/protocol/dubbo/impl/proto/payload.pb.go
new file mode 100644
index 0000000..337027e
--- /dev/null
+++ b/protocol/dubbo/impl/proto/payload.pb.go
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: proto/payload.proto
+
+package payload
+
+import (
+ fmt "fmt"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+// equivalent java StringValue
+type StringValue struct {
+ Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StringValue) Reset() { *m = StringValue{} }
+func (m *StringValue) String() string { return proto.CompactTextString(m) }
+func (*StringValue) ProtoMessage() {}
+func (*StringValue) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{0}
+}
+
+func (m *StringValue) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StringValue.Unmarshal(m, b)
+}
+func (m *StringValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StringValue.Marshal(b, m, deterministic)
+}
+func (m *StringValue) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StringValue.Merge(m, src)
+}
+func (m *StringValue) XXX_Size() int {
+ return xxx_messageInfo_StringValue.Size(m)
+}
+func (m *StringValue) XXX_DiscardUnknown() {
+ xxx_messageInfo_StringValue.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StringValue proto.InternalMessageInfo
+
+func (m *StringValue) GetValue() string {
+ if m != nil {
+ return m.Value
+ }
+ return ""
+}
+
+// equivalent java Int32Value
+type Int32Value struct {
+ Value int32 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Int32Value) Reset() { *m = Int32Value{} }
+func (m *Int32Value) String() string { return proto.CompactTextString(m) }
+func (*Int32Value) ProtoMessage() {}
+func (*Int32Value) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{1}
+}
+
+func (m *Int32Value) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Int32Value.Unmarshal(m, b)
+}
+func (m *Int32Value) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Int32Value.Marshal(b, m, deterministic)
+}
+func (m *Int32Value) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Int32Value.Merge(m, src)
+}
+func (m *Int32Value) XXX_Size() int {
+ return xxx_messageInfo_Int32Value.Size(m)
+}
+func (m *Int32Value) XXX_DiscardUnknown() {
+ xxx_messageInfo_Int32Value.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Int32Value proto.InternalMessageInfo
+
+func (m *Int32Value) GetValue() int32 {
+ if m != nil {
+ return m.Value
+ }
+ return 0
+}
+
+// equivalent java MapValue
+type Map struct {
+ Attachments map[string]string `protobuf:"bytes,1,rep,name=attachments,proto3" json:"attachments,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Map) Reset() { *m = Map{} }
+func (m *Map) String() string { return proto.CompactTextString(m) }
+func (*Map) ProtoMessage() {}
+func (*Map) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{2}
+}
+
+func (m *Map) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Map.Unmarshal(m, b)
+}
+func (m *Map) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Map.Marshal(b, m, deterministic)
+}
+func (m *Map) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Map.Merge(m, src)
+}
+func (m *Map) XXX_Size() int {
+ return xxx_messageInfo_Map.Size(m)
+}
+func (m *Map) XXX_DiscardUnknown() {
+ xxx_messageInfo_Map.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Map proto.InternalMessageInfo
+
+func (m *Map) GetAttachments() map[string]string {
+ if m != nil {
+ return m.Attachments
+ }
+ return nil
+}
+
+// copied from dubbo GenericProtobufObjectOutput.java
+// Messages used for transporting debug information between server and client.
+// An element in a stack trace, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html
+type StackTraceElementProto struct {
+ // The fully qualified name of the class containing the execution point
+ // represented by the stack trace element.
+ ClassName string `protobuf:"bytes,1,opt,name=class_name,json=className,proto3" json:"class_name,omitempty"`
+ // The name of the method containing the execution point represented by the
+ // stack trace element
+ MethodName string `protobuf:"bytes,2,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"`
+ // The name of the file containing the execution point represented by the
+ // stack trace element, or null if this information is unavailable.
+ FileName string `protobuf:"bytes,3,opt,name=file_name,json=fileName,proto3" json:"file_name,omitempty"`
+ // The line number of the source line containing the execution point represented
+ // by this stack trace element, or a negative number if this information is
+ // unavailable.
+ LineNumber int32 `protobuf:"varint,4,opt,name=line_number,json=lineNumber,proto3" json:"line_number,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *StackTraceElementProto) Reset() { *m = StackTraceElementProto{} }
+func (m *StackTraceElementProto) String() string { return proto.CompactTextString(m) }
+func (*StackTraceElementProto) ProtoMessage() {}
+func (*StackTraceElementProto) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{3}
+}
+
+func (m *StackTraceElementProto) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_StackTraceElementProto.Unmarshal(m, b)
+}
+func (m *StackTraceElementProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_StackTraceElementProto.Marshal(b, m, deterministic)
+}
+func (m *StackTraceElementProto) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_StackTraceElementProto.Merge(m, src)
+}
+func (m *StackTraceElementProto) XXX_Size() int {
+ return xxx_messageInfo_StackTraceElementProto.Size(m)
+}
+func (m *StackTraceElementProto) XXX_DiscardUnknown() {
+ xxx_messageInfo_StackTraceElementProto.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StackTraceElementProto proto.InternalMessageInfo
+
+func (m *StackTraceElementProto) GetClassName() string {
+ if m != nil {
+ return m.ClassName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetMethodName() string {
+ if m != nil {
+ return m.MethodName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetFileName() string {
+ if m != nil {
+ return m.FileName
+ }
+ return ""
+}
+
+func (m *StackTraceElementProto) GetLineNumber() int32 {
+ if m != nil {
+ return m.LineNumber
+ }
+ return 0
+}
+
+// An exception that was thrown by some code, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html
+type ThrowableProto struct {
+ // The name of the class of the exception that was actually thrown. Downstream readers
+ // of this message may or may not have the actual class available to initialize, so
+ // this is just used to prefix the message of a generic exception type.
+ OriginalClassName string `protobuf:"bytes,1,opt,name=original_class_name,json=originalClassName,proto3" json:"original_class_name,omitempty"`
+ // The message of this throwable. Not filled if there is no message.
+ OriginalMessage string `protobuf:"bytes,2,opt,name=original_message,json=originalMessage,proto3" json:"original_message,omitempty"`
+ // The stack trace of this Throwable.
+ StackTrace []*StackTraceElementProto `protobuf:"bytes,3,rep,name=stack_trace,json=stackTrace,proto3" json:"stack_trace,omitempty"`
+ // The cause of this Throwable. Not filled if there is no cause.
+ Cause *ThrowableProto `protobuf:"bytes,4,opt,name=cause,proto3" json:"cause,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ThrowableProto) Reset() { *m = ThrowableProto{} }
+func (m *ThrowableProto) String() string { return proto.CompactTextString(m) }
+func (*ThrowableProto) ProtoMessage() {}
+func (*ThrowableProto) Descriptor() ([]byte, []int) {
+ return fileDescriptor_434bbf44284586dc, []int{4}
+}
+
+func (m *ThrowableProto) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ThrowableProto.Unmarshal(m, b)
+}
+func (m *ThrowableProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ThrowableProto.Marshal(b, m, deterministic)
+}
+func (m *ThrowableProto) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ThrowableProto.Merge(m, src)
+}
+func (m *ThrowableProto) XXX_Size() int {
+ return xxx_messageInfo_ThrowableProto.Size(m)
+}
+func (m *ThrowableProto) XXX_DiscardUnknown() {
+ xxx_messageInfo_ThrowableProto.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ThrowableProto proto.InternalMessageInfo
+
+func (m *ThrowableProto) GetOriginalClassName() string {
+ if m != nil {
+ return m.OriginalClassName
+ }
+ return ""
+}
+
+func (m *ThrowableProto) GetOriginalMessage() string {
+ if m != nil {
+ return m.OriginalMessage
+ }
+ return ""
+}
+
+func (m *ThrowableProto) GetStackTrace() []*StackTraceElementProto {
+ if m != nil {
+ return m.StackTrace
+ }
+ return nil
+}
+
+func (m *ThrowableProto) GetCause() *ThrowableProto {
+ if m != nil {
+ return m.Cause
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*StringValue)(nil), "StringValue")
+ proto.RegisterType((*Int32Value)(nil), "Int32Value")
+ proto.RegisterType((*Map)(nil), "Map")
+ proto.RegisterMapType((map[string]string)(nil), "Map.AttachmentsEntry")
+ proto.RegisterType((*StackTraceElementProto)(nil), "StackTraceElementProto")
+ proto.RegisterType((*ThrowableProto)(nil), "ThrowableProto")
+}
+
+func init() { proto.RegisterFile("proto/payload.proto", fileDescriptor_434bbf44284586dc) }
+
+var fileDescriptor_434bbf44284586dc = []byte{
+ // 353 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x4f, 0x4f, 0xea, 0x40,
+ 0x14, 0xc5, 0x53, 0xfa, 0x78, 0x79, 0xdc, 0x26, 0x0f, 0x1c, 0xfc, 0xd3, 0x68, 0x8c, 0xa4, 0xc6,
+ 0x04, 0x37, 0x35, 0x81, 0x85, 0xc4, 0x85, 0x89, 0x31, 0x2c, 0x5c, 0x40, 0x4c, 0x21, 0x6e, 0x9b,
+ 0x4b, 0x19, 0xa1, 0x61, 0x3a, 0x6d, 0x66, 0x06, 0x0d, 0x1b, 0x3f, 0x86, 0x9f, 0xca, 0x0f, 0x65,
+ 0x66, 0xc6, 0x02, 0x2a, 0xbb, 0x99, 0xdf, 0x39, 0xbd, 0x3d, 0xf7, 0x64, 0xa0, 0x59, 0x88, 0x5c,
+ 0xe5, 0x57, 0x05, 0xae, 0x58, 0x8e, 0xd3, 0xd0, 0xdc, 0x82, 0x73, 0xf0, 0x46, 0x4a, 0xa4, 0x7c,
+ 0xf6, 0x84, 0x6c, 0x49, 0xc9, 0x3e, 0x54, 0x5f, 0xf4, 0xc1, 0x77, 0x5a, 0x4e, 0xbb, 0x16, 0xd9,
+ 0x4b, 0x10, 0x00, 0x3c, 0x70, 0xd5, 0xed, 0xec, 0xf0, 0x54, 0x4b, 0xcf, 0x1b, 0xb8, 0x03, 0x2c,
+ 0xc8, 0x35, 0x78, 0xa8, 0x14, 0x26, 0xf3, 0x8c, 0x72, 0x25, 0x7d, 0xa7, 0xe5, 0xb6, 0xbd, 0xce,
+ 0x41, 0x38, 0xc0, 0x22, 0xbc, 0xdb, 0xf0, 0x3e, 0x57, 0x62, 0x15, 0x6d, 0x3b, 0x8f, 0x6f, 0xa1,
+ 0xf1, 0xd3, 0x40, 0x1a, 0xe0, 0x2e, 0xe8, 0xea, 0x2b, 0x8b, 0x3e, 0x6e, 0xfe, 0x5d, 0xd9, 0xca,
+ 0x77, 0x53, 0xe9, 0x39, 0xc1, 0xbb, 0x03, 0x87, 0x23, 0x85, 0xc9, 0x62, 0x2c, 0x30, 0xa1, 0x7d,
+ 0x46, 0xf5, 0x9c, 0x47, 0xbd, 0x23, 0x39, 0x05, 0x48, 0x18, 0x4a, 0x19, 0x73, 0xcc, 0xca, 0xcd,
+ 0x6a, 0x86, 0x0c, 0x31, 0xa3, 0xe4, 0x0c, 0xbc, 0x8c, 0xaa, 0x79, 0x3e, 0xb5, 0xba, 0x9d, 0x0c,
+ 0x16, 0x19, 0xc3, 0x09, 0xd4, 0x9e, 0x53, 0x46, 0xad, 0xec, 0x1a, 0xf9, 0x9f, 0x06, 0xe5, 0xd7,
+ 0x2c, 0xe5, 0x34, 0xe6, 0xcb, 0x6c, 0x42, 0x85, 0xff, 0xc7, 0x74, 0x02, 0x1a, 0x0d, 0x0d, 0x09,
+ 0x3e, 0x1c, 0xf8, 0x3f, 0x9e, 0x8b, 0xfc, 0x15, 0x27, 0x8c, 0xda, 0x40, 0x21, 0x34, 0x73, 0x91,
+ 0xce, 0x52, 0x8e, 0x2c, 0xfe, 0x95, 0x6c, 0xaf, 0x94, 0xee, 0xd7, 0x09, 0x2f, 0xa1, 0xb1, 0xf6,
+ 0x67, 0x54, 0x4a, 0x9c, 0x95, 0x31, 0xeb, 0x25, 0x1f, 0x58, 0x4c, 0x7a, 0xe0, 0x49, 0xdd, 0x42,
+ 0xac, 0x74, 0x0d, 0xbe, 0x6b, 0xfa, 0x3f, 0x0a, 0x77, 0x37, 0x13, 0x81, 0x5c, 0x73, 0x72, 0x01,
+ 0xd5, 0x04, 0x97, 0x92, 0x9a, 0x15, 0xbc, 0x4e, 0x3d, 0xfc, 0x1e, 0x3a, 0xb2, 0xea, 0xe4, 0xaf,
+ 0x79, 0x37, 0xdd, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x04, 0x4d, 0x68, 0x3a, 0x4e, 0x02, 0x00,
+ 0x00,
+}
diff --git a/protocol/dubbo/impl/proto/payload.proto b/protocol/dubbo/impl/proto/payload.proto
new file mode 100644
index 0000000..19f644e
--- /dev/null
+++ b/protocol/dubbo/impl/proto/payload.proto
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+// equivalent java StringValue
+message StringValue {
+ string value = 1;
+}
+
+// equivalent java Int32Value
+message Int32Value {
+ int32 value = 1;
+}
+
+// equivalent java MapValue
+message Map {
+ map<string, string> attachments = 1;
+}
+
+// copied from dubbo GenericProtobufObjectOutput.java
+// Messages used for transporting debug information between server and client.
+// An element in a stack trace, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html
+message StackTraceElementProto {
+ // The fully qualified name of the class containing the execution point
+ // represented by the stack trace element.
+ string class_name = 1;
+
+ // The name of the method containing the execution point represented by the
+ // stack trace element
+ string method_name = 2;
+
+ // The name of the file containing the execution point represented by the
+ // stack trace element, or null if this information is unavailable.
+ string file_name = 3;
+
+ // The line number of the source line containing the execution point represented
+ // by this stack trace element, or a negative number if this information is
+ // unavailable.
+ int32 line_number = 4;
+}
+
+// An exception that was thrown by some code, based on the Java type of the same name.
+//
+// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html
+message ThrowableProto {
+ // The name of the class of the exception that was actually thrown. Downstream readers
+ // of this message may or may not have the actual class available to initialize, so
+ // this is just used to prefix the message of a generic exception type.
+ string original_class_name = 1;
+
+ // The message of this throwable. Not filled if there is no message.
+ string original_message = 2;
+
+ // The stack trace of this Throwable.
+ repeated StackTraceElementProto stack_trace = 3;
+
+ // The cause of this Throwable. Not filled if there is no cause.
+ ThrowableProto cause = 4;
+}
+
+
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/impl/remoting/client_impl.go
similarity index 57%
copy from protocol/dubbo/client.go
copy to protocol/dubbo/impl/remoting/client_impl.go
index 6d1b771..e5f7c71 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/impl/remoting/client_impl.go
@@ -14,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package dubbo
+package remoting
import (
"math/rand"
@@ -25,19 +24,17 @@ import (
)
import (
- hessian "github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
- "gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
- "github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl"
)
var (
@@ -46,48 +43,13 @@ var (
errSessionNotExist = perrors.New("session not exist")
errClientClosed = perrors.New("client closed")
errClientReadTimeout = perrors.New("client read timeout")
+)
+var (
clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
)
-func init() {
-
- // load clientconfig from consumer_config
- // default use dubbo
- consumerConfig := config.GetConsumerConfig()
- if consumerConfig.ApplicationConfig == nil {
- return
- }
- protocolConf := config.GetConsumerConfig().ProtocolConf
- defaultClientConfig := GetDefaultClientConfig()
- if protocolConf == nil {
- logger.Info("protocol_conf default use dubbo config")
- } else {
- dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO]
- if dubboConf == nil {
- logger.Warnf("dubboConf is nil")
- return
- }
- dubboConfByte, err := yaml.Marshal(dubboConf)
- if err != nil {
- panic(err)
- }
- err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig)
- if err != nil {
- panic(err)
- }
- }
- clientConf = &defaultClientConfig
- if err := clientConf.CheckValidity(); err != nil {
- logger.Warnf("[CheckValidity] error: %v", err)
- return
- }
- setClientGrpool()
-
- rand.Seed(time.Now().UnixNano())
-}
-
// SetClientConf set dubbo client config.
func SetClientConf(c ClientConfig) {
clientConf = &c
@@ -100,8 +62,8 @@ func SetClientConf(c ClientConfig) {
}
// GetClientConf get dubbo client config.
-func GetClientConf() ClientConfig {
- return *clientConf
+func GetClientConf() *ClientConfig {
+ return clientConf
}
func setClientGrpool() {
@@ -131,12 +93,13 @@ type AsyncCallbackResponse struct {
// Client is dubbo protocol client.
type Client struct {
- opts Options
- conf ClientConfig
- pool *gettyRPCClientPool
- sequence atomic.Uint64
+ Opts Options
+ Conf ClientConfig
+ Pool *gettyRPCClientPool
+ Sequence atomic.Uint64
- pendingResponses *sync.Map
+ PendingResponses *sync.Map
+ codec impl.DubboCodec
}
// NewClient create a new Client.
@@ -150,19 +113,20 @@ func NewClient(opt Options) *Client {
opt.RequestTimeout = 3 * time.Second
}
- // make sure that client request sequence is an odd number
+ // make sure that client request Sequence is an odd number
initSequence := uint64(rand.Int63n(time.Now().UnixNano()))
if initSequence%2 == 0 {
initSequence++
}
c := &Client{
- opts: opt,
- pendingResponses: new(sync.Map),
- conf: *clientConf,
+ Opts: opt,
+ PendingResponses: new(sync.Map),
+ Conf: *clientConf,
+ codec: impl.DubboCodec{},
}
- c.sequence.Store(initSequence)
- c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+ c.Sequence.Store(initSequence)
+ c.Pool = NewGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
return c
}
@@ -178,6 +142,10 @@ type Request struct {
// NewRequest create a new Request.
func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
+ // NOTE: compatible with old versions
+ if svcUrl.GetParam(constant.SERIALIZATION_KEY, "") == "" {
+ svcUrl.SetParam(constant.SERIALIZATION_KEY, constant.DEFAULT_SERIALIZATION)
+ }
return &Request{
addr: addr,
svcUrl: svcUrl,
@@ -189,68 +157,42 @@ func NewRequest(addr string, svcUrl common.URL, method string, args interface{},
// Response is dubbo protocol response.
type Response struct {
- reply interface{}
- atta map[string]string
+ Reply interface{}
+ Atta map[string]string
}
// NewResponse create a new Response.
func NewResponse(reply interface{}, atta map[string]string) *Response {
return &Response{
- reply: reply,
- atta: atta,
+ Reply: reply,
+ Atta: atta,
}
}
-// CallOneway call by one way
+// CallOneway call one way
func (c *Client) CallOneway(request *Request) error {
- return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
+ return perrors.WithStack(c.call(impl.CT_OneWay, request, NewResponse(nil, nil), nil))
}
-// Call call remoting by two way or one way, if @response.reply is nil, the way of call is one way.
+// Call if @response is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(request *Request, response *Response) error {
- ct := CT_TwoWay
- if response.reply == nil {
- ct = CT_OneWay
+
+ ct := impl.CT_TwoWay
+ if response.Reply == nil {
+ ct = impl.CT_OneWay
}
return perrors.WithStack(c.call(ct, request, response, nil))
}
-// AsyncCall call remoting by async with callback.
+// AsyncCall ...
func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {
- return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
-}
-
-func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {
- p := &DubboPackage{}
- p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
- p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
- p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
- p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
- p.Service.Method = request.method
-
- p.Service.Timeout = c.opts.RequestTimeout
- var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
- if len(timeout) != 0 {
- if t, err := time.ParseDuration(timeout); err == nil {
- p.Service.Timeout = t
- }
- }
-
- p.Header.SerialID = byte(S_Dubbo)
- p.Body = hessian.NewRequest(request.args, request.atta)
- var rsp *PendingResponse
- if ct != CT_OneWay {
- p.Header.Type = hessian.PackageRequest_TwoWay
- rsp = NewPendingResponse()
- rsp.response = response
- rsp.callback = callback
- } else {
- p.Header.Type = hessian.PackageRequest
- }
+ return perrors.WithStack(c.call(impl.CT_TwoWay, request, response, callback))
+}
+func (c *Client) call(ct impl.CallType, request *Request, response *Response, callback common.AsyncCallback) error {
var (
err error
session getty.Session
@@ -265,23 +207,60 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
}
defer func() {
if err == nil {
- c.pool.put(conn)
+ c.Pool.put(conn)
return
}
conn.close()
}()
+ var rsp *PendingResponse
+ svc := impl.Service{}
+ header := impl.DubboHeader{}
+ svc.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
+ svc.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
+ svc.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
+ svc.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
+ svc.Method = request.method
+ svc.Timeout = c.Opts.RequestTimeout
+ var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
+ if len(timeout) != 0 {
+ if t, err := time.ParseDuration(timeout); err == nil {
+ svc.Timeout = t
+ }
+ }
+ p := NewClientRequestPackage(header, svc)
+
+ serialization := request.svcUrl.GetParam(constant.SERIALIZATION_KEY, c.Conf.Serialization)
+ if serialization == constant.HESSIAN2_SERIALIZATION {
+ p.Header.SerialID = constant.S_Hessian2
+ } else if serialization == constant.PROTOBUF_SERIALIZATION {
+ p.Header.SerialID = constant.S_Proto
+ }
+ p.SetBody(impl.NewRequestPayload(request.args, request.atta))
+
+ if err := impl.LoadSerializer(p); err != nil {
+ return err
+ }
+
+ if ct != impl.CT_OneWay {
+ p.Header.Type = impl.PackageRequest_TwoWay
+ rsp = NewPendingResponse()
+ rsp.response = response
+ rsp.callback = callback
+ } else {
+ p.Header.Type = impl.PackageRequest
+ }
if err = c.transfer(session, p, rsp); err != nil {
return perrors.WithStack(err)
}
- if ct == CT_OneWay || callback != nil {
+ if ct == impl.CT_OneWay || callback != nil {
return nil
}
select {
- case <-getty.GetTimeWheel().After(c.opts.RequestTimeout):
- c.removePendingResponse(SequenceType(rsp.seq))
+ case <-getty.GetTimeWheel().After(c.Opts.RequestTimeout):
+ c.removePendingResponse(impl.SequenceType(rsp.seq))
return perrors.WithStack(errClientReadTimeout)
case <-rsp.done:
err = rsp.err
@@ -292,14 +271,14 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
// Close close the client pool.
func (c *Client) Close() {
- if c.pool != nil {
- c.pool.close()
+ if c.Pool != nil {
+ c.Pool.close()
}
- c.pool = nil
+ c.Pool = nil
}
func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) {
- rpcClient, err := c.pool.getGettyRpcClient(DUBBO, addr)
+ rpcClient, err := c.Pool.getGettyRpcClient(impl.DUBBO, addr)
if err != nil {
return nil, nil, perrors.WithStack(err)
}
@@ -310,7 +289,7 @@ func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, NewPendingResponse())
}
-func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
+func (c *Client) transfer(session getty.Session, pkg *impl.DubboPackage,
rsp *PendingResponse) error {
var (
@@ -318,16 +297,24 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
err error
)
- sequence = c.sequence.Add(1)
+ sequence = c.Sequence.Add(1)
if pkg == nil {
- pkg = &DubboPackage{}
- pkg.Body = hessian.NewRequest([]interface{}{}, nil)
- pkg.Body = []interface{}{}
- pkg.Header.Type = hessian.PackageHeartbeat
- pkg.Header.SerialID = byte(S_Dubbo)
+ // make heartbeat package
+ header := impl.DubboHeader{
+ Type: impl.PackageHeartbeat,
+ SerialID: constant.S_Hessian2,
+ }
+ pkg = NewClientRequestPackage(header, impl.Service{})
+ // SetBody
+ reqPayload := impl.NewRequestPayload([]interface{}{}, nil)
+ pkg.SetBody(reqPayload)
+ // set serializer
+ if err := impl.LoadSerializer(pkg); err != nil {
+ return err
+ }
}
- pkg.Header.ID = int64(sequence)
+ pkg.SetID(int64(sequence))
// cond1
if rsp != nil {
@@ -335,9 +322,9 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
c.addPendingResponse(rsp)
}
- err = session.WritePkg(pkg, c.opts.RequestTimeout)
+ err = session.WritePkg(pkg, c.Opts.RequestTimeout)
if err != nil {
- c.removePendingResponse(SequenceType(rsp.seq))
+ c.removePendingResponse(impl.SequenceType(rsp.seq))
} else if rsp != nil { // cond2
// cond2 should not merged with cond1. cause the response package may be returned very
// soon and it will be handled by other goroutine.
@@ -348,16 +335,57 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
}
func (c *Client) addPendingResponse(pr *PendingResponse) {
- c.pendingResponses.Store(SequenceType(pr.seq), pr)
+ c.PendingResponses.Store(impl.SequenceType(pr.seq), pr)
}
-func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
- if c.pendingResponses == nil {
+func (c *Client) removePendingResponse(seq impl.SequenceType) *PendingResponse {
+ if c.PendingResponses == nil {
return nil
}
- if presp, ok := c.pendingResponses.Load(seq); ok {
- c.pendingResponses.Delete(seq)
+ if presp, ok := c.PendingResponses.Load(seq); ok {
+ c.PendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
+
+// PendingResponse ...
+type PendingResponse struct {
+ seq uint64
+ err error
+ start time.Time
+ readStart time.Time
+ callback common.AsyncCallback
+ response *Response
+ done chan struct{}
+}
+
+// NewPendingResponse create a PendingResponses.
+func NewPendingResponse() *PendingResponse {
+ return &PendingResponse{
+ start: time.Now(),
+ response: &Response{},
+ done: make(chan struct{}),
+ }
+}
+
+// GetCallResponse get AsyncCallbackResponse.
+func (r PendingResponse) GetCallResponse() common.CallbackResponse {
+ return AsyncCallbackResponse{
+ Cause: r.err,
+ Start: r.start,
+ ReadStart: r.readStart,
+ Reply: r.response,
+ }
+}
+
+// client side request package, just for serialization
+func NewClientRequestPackage(header impl.DubboHeader, svc impl.Service) *impl.DubboPackage {
+ return &impl.DubboPackage{
+ Header: header,
+ Service: svc,
+ Body: nil,
+ Err: nil,
+ Codec: impl.NewDubboCodec(nil),
+ }
+}
diff --git a/protocol/dubbo/config.go b/protocol/dubbo/impl/remoting/config.go
similarity index 81%
rename from protocol/dubbo/config.go
rename to protocol/dubbo/impl/remoting/config.go
index 635d121..1f26422 100644
--- a/protocol/dubbo/config.go
+++ b/protocol/dubbo/impl/remoting/config.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package dubbo
+package remoting
import (
"time"
@@ -33,16 +33,16 @@ type (
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
- keepAlivePeriod time.Duration
+ KeepAlivePeriodD time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
- tcpReadTimeout time.Duration
+ TcpReadTimeoutD time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
- tcpWriteTimeout time.Duration
+ TcpWriteTimeoutD time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
- waitTimeout time.Duration
+ WaitTimeoutD time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
@@ -50,9 +50,9 @@ type (
// ServerConfig holds supported types by the multiconfig package
ServerConfig struct {
// session
- SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
- sessionTimeout time.Duration
- SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
+ SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
+ SessionTimeoutD time.Duration
+ SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
// grpool
GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"`
@@ -67,16 +67,16 @@ type (
ClientConfig struct {
ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"`
- // session pool
+ // session Pool
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
- HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
- heartbeatPeriod time.Duration
+ HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
+ HeartbeatPeriodD time.Duration
// session
- SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
- sessionTimeout time.Duration
+ SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
+ SessionTimeoutD time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
@@ -89,6 +89,9 @@ type (
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
+
+ // serialization
+ Serialization string `default:"hessian2" yaml:"serialization" json:"serialization"`
}
)
@@ -104,6 +107,7 @@ func GetDefaultClientConfig() ClientConfig {
GrPoolSize: 200,
QueueLen: 64,
QueueNumber: 10,
+ Serialization: "hessian2",
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
@@ -149,19 +153,19 @@ func GetDefaultServerConfig() ServerConfig {
func (c *GettySessionParam) CheckValidity() error {
var err error
- if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
+ if c.KeepAlivePeriodD, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod)
}
- if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
+ if c.TcpReadTimeoutD, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout)
}
- if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
+ if c.TcpWriteTimeoutD, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout)
}
- if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil {
+ if c.WaitTimeoutD, err = time.ParseDuration(c.WaitTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout)
}
@@ -174,16 +178,16 @@ func (c *ClientConfig) CheckValidity() error {
c.ReconnectInterval = c.ReconnectInterval * 1e6
- if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
+ if c.HeartbeatPeriodD, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
- if c.heartbeatPeriod >= time.Duration(getty.MaxWheelTimeSpan) {
+ if c.HeartbeatPeriodD >= time.Duration(getty.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s",
c.HeartbeatPeriod, time.Duration(getty.MaxWheelTimeSpan))
}
- if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
+ if c.SessionTimeoutD, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
@@ -194,11 +198,11 @@ func (c *ClientConfig) CheckValidity() error {
func (c *ServerConfig) CheckValidity() error {
var err error
- if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
+ if c.SessionTimeoutD, err = time.ParseDuration(c.SessionTimeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
- if c.sessionTimeout >= time.Duration(getty.MaxWheelTimeSpan) {
+ if c.SessionTimeoutD >= time.Duration(getty.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "session_timeout %s should be less than %s",
c.SessionTimeout, time.Duration(getty.MaxWheelTimeSpan))
}
diff --git a/protocol/dubbo/impl/remoting/errors.go b/protocol/dubbo/impl/remoting/errors.go
new file mode 100644
index 0000000..8fda9b3
--- /dev/null
+++ b/protocol/dubbo/impl/remoting/errors.go
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/impl/remoting/pool.go
similarity index 91%
rename from protocol/dubbo/pool.go
rename to protocol/dubbo/impl/remoting/pool.go
index c9f5e34..dc8b123 100644
--- a/protocol/dubbo/pool.go
+++ b/protocol/dubbo/impl/remoting/pool.go
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package dubbo
+package remoting
import (
"fmt"
+
"math/rand"
"net"
"sync"
@@ -49,7 +50,7 @@ type gettyRPCClient struct {
}
var (
- errClientPoolClosed = perrors.New("client pool closed")
+ errClientPoolClosed = perrors.New("client Pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) {
@@ -59,13 +60,13 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
- getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
- getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval),
+ getty.WithConnectionNumber((int)(pool.rpcClient.Conf.ConnectionNum)),
+ getty.WithReconnectInterval(pool.rpcClient.Conf.ReconnectInterval),
),
}
go c.gettyClient.RunEventLoop(c.newSession)
idx := 1
- times := int(pool.rpcClient.opts.ConnectTimeout / 1e6)
+ times := int(pool.rpcClient.Opts.ConnectTimeout / 1e6)
for {
idx++
if c.isAvailable() {
@@ -99,7 +100,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
conf ClientConfig
)
- conf = c.pool.rpcClient.conf
+ conf = c.pool.rpcClient.Conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
@@ -111,7 +112,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
- tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
+ tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.KeepAlivePeriodD)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
@@ -121,10 +122,10 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient))
session.SetEventListener(NewRpcClientHandler(c))
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
- session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
- session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
- session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
- session.SetWaitTime(conf.GettySessionParam.waitTimeout)
+ session.SetReadTimeout(conf.GettySessionParam.TcpReadTimeoutD)
+ session.SetWriteTimeout(conf.GettySessionParam.TcpWriteTimeoutD)
+ session.SetCronPeriod((int)(conf.HeartbeatPeriodD.Nanoseconds() / 1e6))
+ session.SetWaitTime(conf.GettySessionParam.WaitTimeoutD)
logger.Debugf("client new session:%s\n", session.Stat())
session.SetTaskPool(clientGrpool)
@@ -296,7 +297,7 @@ type gettyRPCClientPool struct {
conns []*gettyRPCClient
}
-func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
+func NewGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
return &gettyRPCClientPool{
rpcClient: rpcClient,
size: size,
@@ -368,7 +369,7 @@ func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
}
if len(p.conns) >= p.size {
- // delete @conn from client pool
+ // delete @conn from client Pool
// p.remove(conn)
conn.close()
return
diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/impl/remoting/readwriter.go
similarity index 52%
rename from protocol/dubbo/readwriter.go
rename to protocol/dubbo/impl/remoting/readwriter.go
index 9cc7ea2..6d211f5 100644
--- a/protocol/dubbo/readwriter.go
+++ b/protocol/dubbo/impl/remoting/readwriter.go
@@ -15,81 +15,100 @@
* limitations under the License.
*/
-package dubbo
+// TODO: zero.xu readwrite 中将client/server handler 分开
+package remoting
import (
+ "bufio"
"bytes"
"reflect"
)
import (
- "github.com/apache/dubbo-go-hessian2"
+ hessian "github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
- "github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl"
)
////////////////////////////////////////////
// RpcClientPackageHandler
////////////////////////////////////////////
-// RpcClientPackageHandler handle package for client in getty.
+// RpcClientPackageHandler ...
type RpcClientPackageHandler struct {
client *Client
}
-// NewRpcClientPackageHandler create a RpcClientPackageHandler.
+// NewRpcClientPackageHandler ...
func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
return &RpcClientPackageHandler{client: client}
}
-// Read decode @data to DubboPackage.
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
- pkg := &DubboPackage{}
-
- buf := bytes.NewBuffer(data)
- err := pkg.Unmarshal(buf, p.client)
- if err != nil {
+ pkg := NewClientResponsePackage(data)
+ if err := pkg.ReadHeader(); err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
return nil, 0, nil
}
-
- logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
+ logger.Errorf("[RpcClientPackageHandler.Read] ss:%+v, len(@data):%d) = error:%+v ", ss, len(data), err)
return nil, 0, perrors.WithStack(err)
}
+ if pkg.IsHeartBeat() {
+ // heartbeat package doesn't need deserialize
+ return pkg, pkg.GetLen(), nil
+ }
- if pkg.Header.Type&hessian.PackageRequest == 0x00 {
- pkg.Err = pkg.Body.(*hessian.Response).Exception
- pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments)
+ if err := impl.LoadSerializer(pkg); err != nil {
+ return nil, 0, err
}
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ // load response
+ pendingRsp, ok := p.client.PendingResponses.Load(impl.SequenceType(pkg.GetHeader().ID))
+ if !ok {
+ return nil, 0, perrors.Errorf("client.GetPendingResopnse(%v) = nil", pkg.GetHeader().ID)
+ }
+ // set package body
+ body := impl.NewResponsePayload(pendingRsp.(*PendingResponse).response.Reply, nil, nil)
+ pkg.SetBody(body)
+ err := pkg.Unmarshal()
+ if err != nil {
+ return nil, 0, perrors.WithStack(err)
+ }
+ resp := pkg.Body.(*impl.ResponsePayload)
+ pkg.Err = resp.Exception
+ pkg.Body = NewResponse(resp.RspObj, resp.Attachments)
+ return pkg, pkg.GetLen(), nil
}
-// Write encode @pkg.
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
- req, ok := pkg.(*DubboPackage)
+ req, ok := pkg.(*impl.DubboPackage)
if !ok {
- logger.Errorf("illegal pkg:%+v\n", pkg)
return nil, perrors.New("invalid rpc request")
}
-
buf, err := req.Marshal()
if err != nil {
logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
-
return buf.Bytes(), nil
}
+func NewClientResponsePackage(data []byte) *impl.DubboPackage {
+ return &impl.DubboPackage{
+ Header: impl.DubboHeader{},
+ Service: impl.Service{},
+ Body: &impl.ResponsePayload{},
+ Err: nil,
+ Codec: impl.NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))),
+ }
+}
+
////////////////////////////////////////////
// RpcServerPackageHandler
////////////////////////////////////////////
@@ -98,17 +117,29 @@ var (
rpcServerPkgHandler = &RpcServerPackageHandler{}
)
-// RpcServerPackageHandler handle package for server in getty.
-type RpcServerPackageHandler struct{}
+// RpcServerPackageHandler ...
+type RpcServerPackageHandler struct {
+}
-// Read decode @data to DubboPackage.
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
- pkg := &DubboPackage{
- Body: make([]interface{}, 7),
+ pkg := NewServerRequestPackage(data)
+ if err := pkg.ReadHeader(); err != nil {
+ originErr := perrors.Cause(err)
+ if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
+ return nil, 0, nil
+ }
+ return nil, 0, perrors.WithStack(err)
+ }
+
+ if pkg.IsHeartBeat() {
+ return pkg, pkg.GetLen(), nil
}
- buf := bytes.NewBuffer(data)
- err := pkg.Unmarshal(buf)
+ if err := impl.LoadSerializer(pkg); err != nil {
+ return nil, 0, err
+ }
+
+ err := pkg.Unmarshal()
if err != nil {
originErr := perrors.Cause(err)
if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough {
@@ -116,75 +147,33 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
}
logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err)
-
return nil, 0, perrors.WithStack(err)
}
-
- if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
- // convert params of request
- req := pkg.Body.([]interface{}) // length of body should be 7
- if len(req) > 0 {
- var dubboVersion, argsTypes string
- var args []interface{}
- var attachments map[string]string
- if req[0] != nil {
- dubboVersion = req[0].(string)
- }
- if req[1] != nil {
- pkg.Service.Path = req[1].(string)
- }
- if req[2] != nil {
- pkg.Service.Version = req[2].(string)
- }
- if req[3] != nil {
- pkg.Service.Method = req[3].(string)
- }
- if req[4] != nil {
- argsTypes = req[4].(string)
- }
- if req[5] != nil {
- args = req[5].([]interface{})
- }
- if req[6] != nil {
- attachments = req[6].(map[string]string)
- }
- if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 {
- pkg.Service.Path = attachments[constant.PATH_KEY]
- }
- if _, ok := attachments[constant.INTERFACE_KEY]; ok {
- pkg.Service.Interface = attachments[constant.INTERFACE_KEY]
- } else {
- pkg.Service.Interface = pkg.Service.Path
- }
- if len(attachments[constant.GROUP_KEY]) > 0 {
- pkg.Service.Group = attachments[constant.GROUP_KEY]
- }
- pkg.Body = map[string]interface{}{
- "dubboVersion": dubboVersion,
- "argsTypes": argsTypes,
- "args": args,
- "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Path), // path as a key
- "attachments": attachments,
- }
- }
- }
-
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ return pkg, pkg.GetLen(), nil
}
-// Write encode @pkg.
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
- res, ok := pkg.(*DubboPackage)
+ res, ok := pkg.(*impl.DubboPackage)
if !ok {
logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg))
return nil, perrors.New("invalid rpc response")
}
-
buf, err := res.Marshal()
if err != nil {
logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err))
return nil, perrors.WithStack(err)
}
-
return buf.Bytes(), nil
}
+
+// server side receive request package, just for deserialization
+func NewServerRequestPackage(data []byte) *impl.DubboPackage {
+ return &impl.DubboPackage{
+ Header: impl.DubboHeader{},
+ Service: impl.Service{},
+ Body: make([]interface{}, 7),
+ Err: nil,
+ Codec: impl.NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))),
+ }
+
+}
diff --git a/protocol/dubbo/server.go b/protocol/dubbo/impl/remoting/server_impl.go
similarity index 62%
copy from protocol/dubbo/server.go
copy to protocol/dubbo/impl/remoting/server_impl.go
index 8de353a..6419ff6 100644
--- a/protocol/dubbo/server.go
+++ b/protocol/dubbo/impl/remoting/server_impl.go
@@ -14,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package dubbo
+package remoting
import (
"fmt"
@@ -23,95 +22,60 @@ import (
)
import (
- "github.com/dubbogo/getty"
- "github.com/dubbogo/gost/sync"
- "gopkg.in/yaml.v2"
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/logger"
)
import (
- "github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/logger"
- "github.com/apache/dubbo-go/config"
+ "github.com/dubbogo/getty"
+ gxsync "github.com/dubbogo/gost/sync"
)
+// TODO: 需要移动到 业务的实现
var (
srvConf *ServerConfig
srvGrpool *gxsync.TaskPool
)
-func init() {
-
- // load clientconfig from provider_config
- // default use dubbo
- providerConfig := config.GetProviderConfig()
- if providerConfig.ApplicationConfig == nil {
- return
- }
- protocolConf := providerConfig.ProtocolConf
- defaultServerConfig := GetDefaultServerConfig()
- if protocolConf == nil {
- logger.Info("protocol_conf default use dubbo config")
- } else {
- dubboConf := protocolConf.(map[interface{}]interface{})[DUBBO]
- if dubboConf == nil {
- logger.Warnf("dubboConf is nil")
- return
- }
-
- dubboConfByte, err := yaml.Marshal(dubboConf)
- if err != nil {
- panic(err)
- }
- err = yaml.Unmarshal(dubboConfByte, &defaultServerConfig)
- if err != nil {
- panic(err)
- }
- }
- srvConf = &defaultServerConfig
- if err := srvConf.CheckValidity(); err != nil {
- panic(err)
- }
- setServerGrpool()
-}
-
-// SetServerConfig set dubbo server config.
+// SetServerConfig ...
func SetServerConfig(s ServerConfig) {
srvConf = &s
err := srvConf.CheckValidity()
if err != nil {
- logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
+ panic(err)
return
}
- setServerGrpool()
+ SetServerGrpool()
}
-// GetServerConfig get dubbo server config.
-func GetServerConfig() ServerConfig {
- return *srvConf
+// GetServerConfig ...
+func GetServerConfig() *ServerConfig {
+ return srvConf
}
-func setServerGrpool() {
+// SetServerGrpool ...
+func SetServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
}
}
-// Server is dubbo protocol server.
+// Server ...
type Server struct {
conf ServerConfig
tcpServer getty.Server
rpcHandler *RpcServerHandler
}
-// NewServer create a new Server.
-func NewServer() *Server {
+// NewServer ...
+func NewServer(handler StubHandler) *Server {
s := &Server{
conf: *srvConf,
}
- s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout)
+ s.rpcHandler = NewRpcServerHandler(handler, s.conf.SessionNumber, s.conf.SessionTimeoutD)
return s
}
@@ -134,7 +98,7 @@ func (s *Server) newSession(session getty.Session) error {
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
- tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
+ tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.KeepAlivePeriodD)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
@@ -144,10 +108,10 @@ func (s *Server) newSession(session getty.Session) error {
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
- session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
- session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
- session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
- session.SetWaitTime(conf.GettySessionParam.waitTimeout)
+ session.SetReadTimeout(conf.GettySessionParam.TcpReadTimeoutD)
+ session.SetWriteTimeout(conf.GettySessionParam.TcpWriteTimeoutD)
+ session.SetCronPeriod((int)(conf.SessionTimeoutD.Nanoseconds() / 1e6))
+ session.SetWaitTime(conf.GettySessionParam.WaitTimeoutD)
logger.Debugf("app accepts new session:%s\n", session.Stat())
session.SetTaskPool(srvGrpool)
@@ -155,7 +119,7 @@ func (s *Server) newSession(session getty.Session) error {
return nil
}
-// Start start dubbo server.
+// Start ...
func (s *Server) Start(url common.URL) {
var (
addr string
@@ -172,7 +136,7 @@ func (s *Server) Start(url common.URL) {
}
-// Stop stop dubbo server.
+// Stop ...
func (s *Server) Stop() {
s.tcpServer.Close()
}
diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/impl/remoting/server_listener.go
similarity index 60%
rename from protocol/dubbo/listener.go
rename to protocol/dubbo/impl/remoting/server_listener.go
index 4834459..f59e88e 100644
--- a/protocol/dubbo/listener.go
+++ b/protocol/dubbo/impl/remoting/server_listener.go
@@ -15,30 +15,23 @@
* limitations under the License.
*/
-package dubbo
+package remoting
import (
- "context"
- "fmt"
- "net/url"
"sync"
"sync/atomic"
"time"
)
import (
- "github.com/apache/dubbo-go-hessian2"
+ hessian "github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
- "github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
import (
- "github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
- "github.com/apache/dubbo-go/protocol"
- "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl"
)
// todo: writePkg_Timeout will entry *.yml
@@ -99,23 +92,16 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
// OnMessage notified when RPC client session got any message in connection
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
- p, ok := pkg.(*DubboPackage)
+ p, ok := pkg.(*impl.DubboPackage)
if !ok {
logger.Errorf("illegal package")
return
}
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
- if p.Header.Type&hessian.PackageResponse != 0x00 {
- logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
- if p.Err != nil {
- logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
- }
- h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
- } else {
- logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
- p.Header.ResponseStatus = hessian.Response_OK
- reply(session, p, hessian.PackageHeartbeat)
+ if p.Header.Type&impl.PackageHeartbeat != 0x00 {
+ logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body)
+ if p.Err != nil {
+ logger.Errorf("rpc heartbeat response{error: %#v}", p.Err)
}
return
}
@@ -123,7 +109,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.conn.updateSession(session)
- pendingResponse := h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
+ pendingResponse := h.conn.pool.rpcClient.removePendingResponse(impl.SequenceType(p.Header.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *p)
return
@@ -133,27 +119,28 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
pendingResponse.err = p.Err
}
- pendingResponse.response.atta = p.Body.(*Response).atta
+ pendingResponse.response.Atta = p.Body.(*Response).Atta
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
+ logger.Info("proxy service callback")
pendingResponse.callback(pendingResponse.GetCallResponse())
}
}
// OnCron notified when RPC client session got any message in cron job
func (h *RpcClientHandler) OnCron(session getty.Session) {
- clientRpcSession, err := h.conn.getClientRpcSession(session)
+ rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
logger.Errorf("client.getClientSession(session{%s}) = error{%v}",
session.Stat(), perrors.WithStack(err))
return
}
- if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
+ if h.conn.pool.rpcClient.Conf.SessionTimeoutD.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
logger.Warnf("session{%s} timeout{%s}, reqNum{%d}",
- session.Stat(), time.Since(session.GetActive()).String(), clientRpcSession.reqNum)
- h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
+ session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
+ h.conn.removeSession(session) // -> h.conn.close() -> h.conn.Pool.remove(h.conn)
return
}
@@ -164,20 +151,31 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
// RpcServerHandler
// //////////////////////////////////////////
-// RpcServerHandler is handler of RPC Server
+type StubHandler interface {
+ OnPackage(session getty.Session, pkg *impl.DubboPackage)
+}
+
+type StubFunc func(session getty.Session, pkg *impl.DubboPackage)
+
+func (f StubFunc) OnPackage(session getty.Session, pkg *impl.DubboPackage) {
+ f(session, pkg)
+}
+
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
+ stub StubHandler
}
// NewRpcServerHandler creates RpcServerHandler with @maxSessionNum and @sessionTimeout
-func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
+func NewRpcServerHandler(stubHandler StubHandler, maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
+ stub: stubHandler,
}
}
@@ -224,87 +222,51 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
h.rwlock.Unlock()
- p, ok := pkg.(*DubboPackage)
+ p, ok := pkg.(*impl.DubboPackage)
if !ok {
logger.Errorf("illegal package{%#v}", pkg)
return
}
- p.Header.ResponseStatus = hessian.Response_OK
+ p.SetResponseStatus(hessian.Response_OK)
+ //p.Header.ResponseStatus = hessian.Response_OK
// heartbeat
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
- logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body)
- reply(session, p, hessian.PackageHeartbeat)
+ if p.GetHeader().Type&impl.PackageHeartbeat != 0x00 {
+ logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.GetHeader(), p.GetService(), p.GetBody())
+ h.reply(session, p, impl.PackageHeartbeat)
return
}
twoway := true
// not twoway
- if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 {
+ if p.GetHeader().Type&impl.PackageRequest_TwoWay == 0x00 {
twoway = false
}
defer func() {
if e := recover(); e != nil {
- p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
+ p.SetResponseStatus(hessian.Response_SERVER_ERROR)
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
- p.Body = perrors.WithStack(err)
+ p.SetBody(perrors.WithStack(err))
} else if err, ok := e.(string); ok {
logger.Errorf("OnMessage panic: %+v", perrors.New(err))
- p.Body = perrors.New(err)
+ p.SetBody(perrors.New(err))
} else {
logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
- p.Body = e
+ p.SetBody(e)
}
if !twoway {
return
}
- reply(session, p, hessian.PackageResponse)
+ h.reply(session, p, impl.PackageResponse)
}
}()
- u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}),
- common.WithParamsValue(constant.GROUP_KEY, p.Service.Group),
- common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface),
- common.WithParamsValue(constant.VERSION_KEY, p.Service.Version))
- exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey())
- if exporter == nil {
- err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey())
- logger.Errorf(err.Error())
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = err
- reply(session, p, hessian.PackageResponse)
- return
- }
- invoker := exporter.(protocol.Exporter).GetInvoker()
- if invoker != nil {
- attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string)
- attachments[constant.LOCAL_ADDR] = session.LocalAddr()
- attachments[constant.REMOTE_ADDR] = session.RemoteAddr()
-
- args := p.Body.(map[string]interface{})["args"].([]interface{})
- inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
-
- ctx := rebuildCtx(inv)
-
- result := invoker.Invoke(ctx, inv)
- if err := result.Error(); err != nil {
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(nil, err, result.Attachments())
- } else {
- res := result.Result()
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(res, nil, result.Attachments())
- }
- }
-
- if !twoway {
- return
- }
- reply(session, p, hessian.PackageResponse)
+ h.stub.OnPackage(session, p)
+ h.reply(session, p, impl.PackageResponse)
}
// OnCron notified when RPC server session got any message in cron job
@@ -333,38 +295,35 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
-// rebuildCtx rebuild the context by attachment.
-// Once we decided to transfer more context's key-value, we should change this.
-// now we only support rebuild the tracing context
-func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
- ctx := context.Background()
-
- // actually, if user do not use any opentracing framework, the err will not be nil.
- spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
- opentracing.TextMapCarrier(inv.Attachments()))
- if err == nil {
- ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
+func (h *RpcServerHandler) reply(session getty.Session, req *impl.DubboPackage, tp impl.PackageType) {
+ header := impl.DubboHeader{
+ SerialID: req.GetHeader().SerialID,
+ Type: tp,
+ ID: req.GetHeader().ID,
+ BodyLen: 0,
+ ResponseStatus: req.GetHeader().ResponseStatus,
}
- return ctx
-}
-
-func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
- resp := &DubboPackage{
- Header: hessian.DubboHeader{
- SerialID: req.Header.SerialID,
- Type: tp,
- ID: req.Header.ID,
- ResponseStatus: req.Header.ResponseStatus,
- },
+ resp := NewServerResponsePackage(header)
+ if err := impl.LoadSerializer(resp); err != nil {
+ logger.Errorf("Reply error %v", err)
+ return
}
- if req.Header.Type&hessian.PackageRequest != 0x00 {
- resp.Body = req.Body
- } else {
- resp.Body = nil
+ if req.GetHeader().Type&impl.PackageRequest != 0x00 {
+ resp.SetBody(req.GetBody())
}
if err := session.WritePkg(resp, writePkg_Timeout); err != nil {
- logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header)
+ logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.GetHeader())
+ }
+}
+
+// server side response package, just for serialization
+func NewServerResponsePackage(header impl.DubboHeader) *impl.DubboPackage {
+ return &impl.DubboPackage{
+ Header: header,
+ Body: nil,
+ Err: nil,
+ Codec: impl.NewDubboCodec(nil),
}
}
diff --git a/protocol/dubbo/impl/request.go b/protocol/dubbo/impl/request.go
new file mode 100644
index 0000000..0e770c3
--- /dev/null
+++ b/protocol/dubbo/impl/request.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+type RequestPayload struct {
+ Params interface{}
+ Attachments map[string]string
+}
+
+func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload {
+ if atta == nil {
+ atta = make(map[string]string)
+ }
+ return &RequestPayload{
+ Params: args,
+ Attachments: atta,
+ }
+}
+
+func EnsureRequestPayload(body interface{}) *RequestPayload {
+ if req, ok := body.(*RequestPayload); ok {
+ return req
+ }
+ return NewRequestPayload(body, nil)
+}
diff --git a/protocol/dubbo/impl/response.go b/protocol/dubbo/impl/response.go
new file mode 100644
index 0000000..ea0a6ef
--- /dev/null
+++ b/protocol/dubbo/impl/response.go
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+type ResponsePayload struct {
+ RspObj interface{}
+ Exception error
+ Attachments map[string]string
+}
+
+// NewResponse create a new ResponsePayload
+func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload {
+ if attachments == nil {
+ attachments = make(map[string]string)
+ }
+ return &ResponsePayload{
+ RspObj: rspObj,
+ Exception: exception,
+ Attachments: attachments,
+ }
+}
+
+func EnsureResponsePayload(body interface{}) *ResponsePayload {
+ if res, ok := body.(*ResponsePayload); ok {
+ return res
+ }
+ if exp, ok := body.(error); ok {
+ return NewResponsePayload(nil, exp, nil)
+ }
+ return NewResponsePayload(body, nil, nil)
+}
diff --git a/protocol/dubbo/impl/serialization.go b/protocol/dubbo/impl/serialization.go
new file mode 100644
index 0000000..7ce76a8
--- /dev/null
+++ b/protocol/dubbo/impl/serialization.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package impl
+
+import (
+ "fmt"
+)
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+)
+
+var (
+ serializers = make(map[string]interface{})
+ nameMaps = make(map[byte]string)
+)
+
+func init() {
+ nameMaps = map[byte]string{
+ constant.S_Hessian2: constant.HESSIAN2_SERIALIZATION,
+ constant.S_Proto: constant.PROTOBUF_SERIALIZATION,
+ }
+}
+
+func SetSerializer(name string, serializer interface{}) {
+ serializers[name] = serializer
+}
+
+func GetSerializerById(id byte) (interface{}, error) {
+ name, ok := nameMaps[id]
+ if !ok {
+ panic(fmt.Sprintf("serialId %d not found", id))
+ }
+ serializer, ok := serializers[name]
+ if !ok {
+ panic(fmt.Sprintf("serialization %s not found", name))
+ }
+ return serializer, nil
+}
diff --git a/protocol/dubbo/impl/serialize.go b/protocol/dubbo/impl/serialize.go
new file mode 100644
index 0000000..1f913f7
--- /dev/null
+++ b/protocol/dubbo/impl/serialize.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package impl
+
+import (
+ "github.com/apache/dubbo-go/common/constant"
+)
+
+type Serializer interface {
+ Marshal(p DubboPackage) ([]byte, error)
+ Unmarshal([]byte, *DubboPackage) error
+}
+
+func LoadSerializer(p *DubboPackage) error {
+ // NOTE: default serialID is S_Hessian
+ serialID := p.Header.SerialID
+ if serialID == 0 {
+ serialID = constant.S_Hessian2
+ }
+ serializer, err := GetSerializerById(serialID)
+ if err != nil {
+ panic(err)
+ }
+ p.SetSerializer(serializer.(Serializer))
+ return nil
+}
diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go
index 8de353a..134a58c 100644
--- a/protocol/dubbo/server.go
+++ b/protocol/dubbo/server.go
@@ -18,25 +18,25 @@
package dubbo
import (
+ "context"
"fmt"
- "net"
+ "net/url"
)
-
import (
- "github.com/dubbogo/getty"
- "github.com/dubbogo/gost/sync"
+ "github.com/opentracing/opentracing-go"
"gopkg.in/yaml.v2"
)
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
-)
-
-var (
- srvConf *ServerConfig
- srvGrpool *gxsync.TaskPool
+ "github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl"
+ "github.com/apache/dubbo-go/protocol/dubbo/impl/remoting"
+ "github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/dubbogo/getty"
)
func init() {
@@ -48,7 +48,7 @@ func init() {
return
}
protocolConf := providerConfig.ProtocolConf
- defaultServerConfig := GetDefaultServerConfig()
+ defaultServerConfig := remoting.GetDefaultServerConfig()
if protocolConf == nil {
logger.Info("protocol_conf default use dubbo config")
} else {
@@ -67,112 +67,59 @@ func init() {
panic(err)
}
}
- srvConf = &defaultServerConfig
- if err := srvConf.CheckValidity(); err != nil {
- panic(err)
- }
- setServerGrpool()
-}
-
-// SetServerConfig set dubbo server config.
-func SetServerConfig(s ServerConfig) {
- srvConf = &s
- err := srvConf.CheckValidity()
- if err != nil {
- logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
- return
- }
- setServerGrpool()
-}
-
-// GetServerConfig get dubbo server config.
-func GetServerConfig() ServerConfig {
- return *srvConf
-}
-
-func setServerGrpool() {
- if srvConf.GrPoolSize > 1 {
- srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
- gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
- }
-}
-
-// Server is dubbo protocol server.
-type Server struct {
- conf ServerConfig
- tcpServer getty.Server
- rpcHandler *RpcServerHandler
-}
-
-// NewServer create a new Server.
-func NewServer() *Server {
-
- s := &Server{
- conf: *srvConf,
- }
-
- s.rpcHandler = NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout)
-
- return s
+ remoting.SetServerConfig(defaultServerConfig)
}
-func (s *Server) newSession(session getty.Session) error {
- var (
- ok bool
- tcpConn *net.TCPConn
- )
- conf := s.conf
-
- if conf.GettySessionParam.CompressEncoding {
- session.SetCompressType(getty.CompressZip)
+// rebuildCtx rebuild the context by attachment.
+// Once we decided to transfer more context's key-value, we should change this.
+// now we only support rebuild the tracing context
+func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
+ ctx := context.Background()
+
+ // actually, if user do not use any opentracing framework, the err will not be nil.
+ spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+ opentracing.TextMapCarrier(inv.Attachments()))
+ if err == nil {
+ ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
}
-
- if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
- panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
- }
-
- tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
- tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
- if conf.GettySessionParam.TcpKeepAlive {
- tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
- }
- tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
- tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
-
- session.SetName(conf.GettySessionParam.SessionName)
- session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
- session.SetPkgHandler(rpcServerPkgHandler)
- session.SetEventListener(s.rpcHandler)
- session.SetWQLen(conf.GettySessionParam.PkgWQSize)
- session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
- session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
- session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
- session.SetWaitTime(conf.GettySessionParam.waitTimeout)
- logger.Debugf("app accepts new session:%s\n", session.Stat())
-
- session.SetTaskPool(srvGrpool)
-
- return nil
+ return ctx
}
-// Start start dubbo server.
-func (s *Server) Start(url common.URL) {
- var (
- addr string
- tcpServer getty.Server
- )
-
- addr = url.Location
- tcpServer = getty.NewTCPServer(
- getty.WithLocalAddress(addr),
- )
- tcpServer.RunEventLoop(s.newSession)
- logger.Debugf("s bind addr{%s} ok!", addr)
- s.tcpServer = tcpServer
-
-}
-
-// Stop stop dubbo server.
-func (s *Server) Stop() {
- s.tcpServer.Close()
+func NewStubHandler() remoting.StubHandler {
+ return remoting.StubFunc(func(session getty.Session, p *impl.DubboPackage) {
+ u := common.NewURLWithOptions(common.WithPath(p.GetService().Path), common.WithParams(url.Values{}),
+ common.WithParamsValue(constant.GROUP_KEY, p.GetService().Group),
+ common.WithParamsValue(constant.INTERFACE_KEY, p.GetService().Interface),
+ common.WithParamsValue(constant.VERSION_KEY, p.GetService().Version))
+
+ exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey())
+ if exporter == nil {
+ err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey())
+ logger.Errorf(err.Error())
+ p.SetResponseStatus(impl.Response_OK)
+ p.SetBody(err)
+ return
+ }
+ invoker := exporter.(protocol.Exporter).GetInvoker()
+ if invoker != nil {
+ attachments := p.GetBody().(map[string]interface{})["attachments"].(map[string]string)
+ attachments[constant.LOCAL_ADDR] = session.LocalAddr()
+ attachments[constant.REMOTE_ADDR] = session.RemoteAddr()
+
+ args := p.GetBody().(map[string]interface{})["args"].([]interface{})
+ inv := invocation.NewRPCInvocation(p.GetService().Method, args, attachments)
+
+ ctx := rebuildCtx(inv)
+ result := invoker.Invoke(ctx, inv)
+ logger.Debugf("invoker result: %+v", result)
+ if err := result.Error(); err != nil {
+ p.SetResponseStatus(impl.Response_OK)
+ p.SetBody(&impl.ResponsePayload{nil, err, result.Attachments()})
+ } else {
+ res := result.Result()
+ p.SetResponseStatus(impl.Response_OK)
+ p.SetBody(&impl.ResponsePayload{res, nil, result.Attachments()})
+ }
+ }
+ })
}
diff --git a/protocol/dubbo/listener_test.go b/protocol/dubbo/server_test.go
similarity index 100%
rename from protocol/dubbo/listener_test.go
rename to protocol/dubbo/server_test.go
index 5f80981..aa7d750 100644
--- a/protocol/dubbo/listener_test.go
+++ b/protocol/dubbo/server_test.go
@@ -22,14 +22,14 @@ import (
)
import (
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol/invocation"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/mocktracer"
)
// test rebuild the ctx
diff --git a/test/integrate/dubbo/go-server/server.go b/test/integrate/dubbo/go-server/server.go
index 115bf0a..a5d18db 100644
--- a/test/integrate/dubbo/go-server/server.go
+++ b/test/integrate/dubbo/go-server/server.go
@@ -48,7 +48,7 @@ func main() {
select {
case <-stopC:
// wait getty send resp to consumer
- time.Sleep(3*time.Second)
+ time.Sleep(3 * time.Second)
return
case <-time.After(time.Minute):
panic("provider already running 1 min, but can't be call by consumer")