You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/07/06 08:52:30 UTC
[servicecomb-mesher] branch master updated: Add unit test for
coder.go (#122)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-mesher.git
The following commit(s) were added to refs/heads/master by this push:
new 797a8b8 Add unit test for coder.go (#122)
797a8b8 is described below
commit 797a8b89d0ed2ae0576d5fe516d3f03ba312733f
Author: t-xinlin <t_...@sina.com>
AuthorDate: Mon Jul 6 16:52:17 2020 +0800
Add unit test for coder.go (#122)
* Remove ratelimiter handler
* Add unit test for bootstrap
* 修改bootstrap适配新的go-chassis
* 修改go.mod适配新的go-chassis
* UPdate travis.yaml
* golint change
* Fix: GoSecure Checker
* Fix: travis.yam;
* Fix bootstrap unit test
* Fix: rm unused package
* Fix: unit test error
* Add unit test to dubbo protocl and http protocol modile.
* Fix: 日志打印.
* Add unit test to dubbo module.
* Add UT for dubbo server.
* Fix: unit test error.
* Fix: unit test error.
* Add unit test .
* formate go file.
* Fix: Unit test error.
* Fix: Unit test error.
* Add unit test for coder.go.
* Add unit test for coder.go
* Add unit test to utils module.
* Impro: go cyclo hecker.
* Add unit test.
* Add unit test.
Co-authored-by: “t_xinlin@sina.com <Happy100>
---
.../client/chassis/dubbo_chassis_client_test.go | 138 +++++++++++
proxy/protocol/dubbo/client/client_conn_test.go | 96 ++++++++
proxy/protocol/dubbo/client/dubbo_client_test.go | 15 +-
proxy/protocol/dubbo/dubbo/codec.go | 86 ++++---
proxy/protocol/dubbo/dubbo/codec_test.go | 222 +++++++++++++++++-
proxy/protocol/dubbo/proxy/dubbo_proxy_ouput.go | 257 ++-------------------
.../protocol/dubbo/proxy/dubbo_proxy_ouput_test.go | 255 ++++++++++++++++++++
proxy/protocol/dubbo/proxy/rest2dubbo.go | 251 --------------------
proxy/protocol/dubbo/utils/buffer.go | 5 +-
proxy/protocol/dubbo/utils/buffer_test.go | 62 ++++-
proxy/protocol/dubbo/utils/msgqueue_test.go | 38 +--
proxy/protocol/dubbo/utils/thrmgr_test.go | 35 ++-
proxy/protocol/dubbo/utils/typeutil_test.go | 123 +++++++++-
13 files changed, 1030 insertions(+), 553 deletions(-)
diff --git a/proxy/protocol/dubbo/client/chassis/dubbo_chassis_client_test.go b/proxy/protocol/dubbo/client/chassis/dubbo_chassis_client_test.go
new file mode 100644
index 0000000..f4310fb
--- /dev/null
+++ b/proxy/protocol/dubbo/client/chassis/dubbo_chassis_client_test.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package chassisclient
+
+import (
+ "context"
+ "fmt"
+ mesherCommon "github.com/apache/servicecomb-mesher/proxy/common"
+ dubboClient "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/client"
+ "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
+ dubboproxy "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/proxy"
+ util "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
+ "github.com/go-chassis/go-chassis/core/client"
+ "github.com/go-chassis/go-chassis/core/invocation"
+ "github.com/go-chassis/go-chassis/core/lager"
+ "github.com/stretchr/testify/assert"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "testing"
+ "time"
+)
+
+func init() {
+ lager.Init(&lager.Options{LoggerLevel: "INFO", RollingPolicy: "size"})
+}
+
+func TestDubboChassisClient(t *testing.T) {
+ addr := "127.0.0.1:31011"
+ tcpAddr, _ := net.ResolveTCPAddr("tcp", addr)
+ l, _ := net.ListenTCP("tcp", tcpAddr)
+ writeError := false
+ go func(l *net.TCPListener, writeError bool) {
+ conn, _ := l.AcceptTCP()
+ for {
+ buf := make([]byte, dubbo.HeaderLength)
+ _, err := conn.Read(buf)
+ if err != nil {
+ continue
+ }
+ req := new(dubbo.Request)
+ bodyLen := 0
+ coder := dubbo.DubboCodec{}
+ ret := coder.DecodeDubboReqHead(req, buf, &bodyLen)
+ fmt.Println("ret: ", ret)
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+ rsp := &dubbo.DubboRsp{}
+ if !writeError {
+ rsp.SetStatus(dubbo.Ok)
+ }
+ coder.EncodeDubboRsp(rsp, &buffer)
+
+ hf := buffer.GetValidData()
+ conn.Write(hf)
+
+ // case header[0] != MagicHigh
+ if writeError {
+ hf[0] = 0
+ conn.Write(hf)
+ }
+ }
+ }(l, writeError)
+
+ c, err := NewDubboChassisClient(client.Options{
+ Service: "dubbotest",
+ PoolSize: 10,
+ Timeout: time.Second * 10,
+ Endpoint: "127.0.0.1:23101",
+ })
+ assert.NoError(t, err)
+
+ err = c.Close()
+ assert.NoError(t, err)
+ assert.Equal(t, "highway_client", c.String())
+
+ c.GetOptions()
+
+ c.ReloadConfigs(client.Options{
+ Service: "dubbotest",
+ PoolSize: 10,
+ Timeout: time.Second * 10,
+ Endpoint: "127.0.0.1:23101"})
+
+ inv := &invocation.Invocation{}
+ inv.Args = &dubbo.Request{}
+ rsp := &dubboClient.WrapResponse{}
+
+ dubboproxy.DubboListenAddr = addr
+ endPoint := addr
+ os.Setenv(mesherCommon.EnvSpecificAddr, addr)
+ // case endPoint==""
+ err = c.Call(context.Background(), "", inv, rsp)
+ assert.Error(t, err)
+
+ // case endPoint error
+ err = c.Call(context.Background(), "127.0.0.1:23101", inv, rsp)
+ assert.Error(t, err)
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+
+ // case service error
+ u, _ := url.Parse(ts.URL)
+ err = c.Call(context.Background(), u.Host, inv, rsp)
+ assert.Error(t, err)
+
+ // case endPoint == dubboproxy.DubboListenAddr
+ err = c.Call(context.Background(), endPoint, inv, rsp)
+ assert.NoError(t, err)
+
+ // writeError == true
+ writeError = true
+ c.Call(context.Background(), endPoint, inv, rsp)
+
+ // writeError == false
+ writeError = false
+ c.Call(context.Background(), endPoint, inv, rsp)
+
+}
diff --git a/proxy/protocol/dubbo/client/client_conn_test.go b/proxy/protocol/dubbo/client/client_conn_test.go
new file mode 100644
index 0000000..940eddd
--- /dev/null
+++ b/proxy/protocol/dubbo/client/client_conn_test.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubboclient
+
+import (
+ "fmt"
+ "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
+ util "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
+ "github.com/stretchr/testify/assert"
+ "net"
+ "testing"
+ "time"
+)
+
+func TestClientConn(t *testing.T) {
+ addr := "127.0.0.1:32100"
+ tcpAddr, _ := net.ResolveTCPAddr("tcp", addr)
+ l, _ := net.ListenTCP("tcp", tcpAddr)
+ go func(l *net.TCPListener) {
+ conn, _ := l.AcceptTCP()
+ for {
+ buf := make([]byte, dubbo.HeaderLength)
+ _, err := conn.Read(buf)
+ if err != nil {
+ continue
+ }
+ req := new(dubbo.Request)
+ bodyLen := 0
+ coder := dubbo.DubboCodec{}
+ ret := coder.DecodeDubboReqHead(req, buf, &bodyLen)
+ fmt.Println("ret: ", ret)
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+ rsp := &dubbo.DubboRsp{}
+ coder.EncodeDubboRsp(rsp, &buffer)
+
+ hf := buffer.GetValidData()
+ conn.Write(hf)
+
+ // case header[0] != MagicHigh
+ hf[0] = 0
+ conn.Write(hf)
+ }
+ }(l)
+
+ c, errDial := net.DialTimeout("tcp", addr, time.Second*5)
+ assert.NoError(t, errDial)
+ conn, ok := c.(*net.TCPConn)
+ assert.Equal(t, true, ok)
+
+ connClinet := NewDubboClientConnetction(conn, NewDubboClient(addr, nil, time.Second*5), nil)
+ go func(c *DubboClientConnection) {
+ t := time.NewTimer(time.Second)
+ for range t.C {
+ c.SendMsg(dubbo.NewDubboRequest())
+ }
+ }(connClinet)
+
+ // Case conn open
+ connClinet.Open()
+
+ // Case conn closed
+ select {
+ case <-time.After(time.Second * 3):
+ conn.Close()
+ connClinet.SendMsg(dubbo.NewDubboRequest())
+ }
+ // case close
+ connClinet.Close()
+ assert.Equal(t, true, connClinet.Closed())
+ connClinet.SendMsg(dubbo.NewDubboRequest())
+
+ select {
+ case <-time.After(time.Second * 5):
+ connClinet.Close()
+ }
+
+ // case conn closed
+ NewDubboClientConnetction(conn, NewDubboClient(addr, nil, time.Second*5), nil)
+
+}
diff --git a/proxy/protocol/dubbo/client/dubbo_client_test.go b/proxy/protocol/dubbo/client/dubbo_client_test.go
index 67a6273..49d75b3 100644
--- a/proxy/protocol/dubbo/client/dubbo_client_test.go
+++ b/proxy/protocol/dubbo/client/dubbo_client_test.go
@@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
+ "time"
)
func init() {
@@ -39,9 +40,8 @@ func TestClientMgr_GetClient(t *testing.T) {
w.WriteHeader(http.StatusOK)
}))
- addr := ts.URL
u, _ := url.Parse(ts.URL)
- addr = u.Host
+ addr := u.Host
clientMgr := NewClientMgr()
// case timeout=0
c, err := clientMgr.GetClient(addr, 0)
@@ -65,6 +65,17 @@ func TestClientMgr_GetClient(t *testing.T) {
// case get addr
c.GetAddr()
+ go func() {
+ c.Close()
+ }()
+
+ select {
+ case <-time.After(time.Second):
+ c.routeMgr.Done()
+ }
+ // case closed
+ c.conn.conn.Close()
+ c.Send(dubbo.NewDubboRequest())
// case net error
ts.Close()
clientMgr.GetClient(addr, 0)
diff --git a/proxy/protocol/dubbo/dubbo/codec.go b/proxy/protocol/dubbo/dubbo/codec.go
index 8c3d8be..2aca409 100644
--- a/proxy/protocol/dubbo/dubbo/codec.go
+++ b/proxy/protocol/dubbo/dubbo/codec.go
@@ -162,63 +162,61 @@ func (p *DubboCodec) DecodeDubboRspBody(buffer *util.ReadBuffer, rsp *DubboRsp)
if rsp.IsHeartbeat() {
rsp.SetValue(HeartBeatEvent)
}
- //获取状态
- if rsp.GetStatus() == Ok {
- if rsp.IsHeartbeat() && (HeartBeatEvent == rsp.GetValue()) {
- //decodeHeartbeatData
+
+ if rsp.GetStatus() != Ok {
+ obj, err = buffer.ReadObject()
+ if err != nil {
+ rsp.SetErrorMsg(err.Error())
+ return 0
+ }
+ if s, ok := obj.(string); !ok {
+ rsp.SetErrorMsg("unknown error")
+ } else {
+ rsp.SetErrorMsg(s)
+ }
+ return 0
+ }
+
+ switch rsp.IsHeartbeat() {
+ case true:
+ //decodeHeartbeatData
+ obj, err = buffer.ReadObject()
+ if err != nil {
+ rsp.SetStatus(ServerError)
+ rsp.SetErrorMsg(err.Error())
+ return 0
+ }
+ case false:
+ //decodeResult
+ valueType, err := buffer.ReadByte()
+ if err != nil {
+ rsp.SetStatus(ServerError)
+ rsp.SetErrorMsg(err.Error())
+ return 0
+ }
+ switch valueType {
+ case ResponseNullValue:
+ //do nothing
+ rsp.SetValue(nil)
+ return 0
+ case ResponseValue:
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
- return 0
+ return -1
}
- } else if rsp.mEvent {
- //decodeEventData
+ case ResponseWithException:
+ //readObject,设置异常
+ rsp.SetStatus(ServiceError)
obj, err = buffer.ReadObject()
if err != nil {
rsp.SetStatus(ServerError)
rsp.SetErrorMsg(err.Error())
return 0
}
- } else {
- //decodeResult
- var valueType byte = buffer.ReadByte()
- switch valueType {
- case ResponseNullValue:
- //do nothing
- rsp.SetValue(nil)
- return 0
- case ResponseValue:
- obj, err = buffer.ReadObject()
- if err != nil {
- rsp.SetStatus(ServerError)
- rsp.SetErrorMsg(err.Error())
- return -1
- }
- case ResponseWithException:
- //readObject,设置异常
- rsp.SetStatus(ServiceError)
- obj, err = buffer.ReadObject()
- if err != nil {
- rsp.SetStatus(ServerError)
- rsp.SetErrorMsg(err.Error())
- return 0
- }
- }
}
rsp.SetValue(buffer.GetBuf())
- //rsp.SetValue(obj)
- } else {
- obj, err = buffer.ReadObject()
- if err != nil {
- rsp.SetErrorMsg(err.Error())
- } else {
- if s, ok := obj.(string); !ok {
- rsp.SetErrorMsg("unknown error")
- } else {
- rsp.SetErrorMsg(s)
- }
- }
}
return 0
diff --git a/proxy/protocol/dubbo/dubbo/codec_test.go b/proxy/protocol/dubbo/dubbo/codec_test.go
index 771f5ae..582d9fb 100644
--- a/proxy/protocol/dubbo/dubbo/codec_test.go
+++ b/proxy/protocol/dubbo/dubbo/codec_test.go
@@ -33,7 +33,6 @@ func TestDubboCodec_DecodeDubboReqBody(t *testing.T) {
req := NewDubboRequest()
resp := &DubboRsp{}
resp.Init()
- resp.SetStatus(ServerError)
wbf := &util.WriteBuffer{}
rbf := &util.ReadBuffer{}
@@ -48,6 +47,227 @@ func TestDubboCodec_DecodeDubboReqBody(t *testing.T) {
assert.Nil(t, obj)
assert.Equal(t, Hessian2, d.GetContentTypeID())
+
+ // case EncodeDubboRsp status is ERROR
+ t.Run("Test status error", func(t *testing.T) {
+ resp.SetStatus(ServerError)
+ d.EncodeDubboRsp(resp, wbf)
+ d.DecodeDubboRspBody(rbf, resp)
+ })
+
+ // =====OK===============
+ t.Run("Test status ok", func(t *testing.T) {
+ // case event
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+
+ resp.SetStatus(Ok)
+ resp.SetEvent(true)
+ rbf.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+ d.EncodeDubboRsp(resp, wbf)
+ d.DecodeDubboRspBody(rbf, resp)
+
+ //case
+ resp.SetStatus(Ok)
+ resp.SetEvent(false)
+ //resp.mEvent = true
+
+ // case ResponseValue
+ buffer.WriteIndex(0)
+ buffer.WriteByte(ResponseValue)
+ rbf.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+
+ d.EncodeDubboRsp(resp, wbf)
+ d.DecodeDubboRspBody(rbf, resp)
+
+ // case ResponseNullValue
+ resp.SetStatus(Ok)
+ resp.SetEvent(false)
+ buffer.WriteIndex(0)
+ buffer.WriteByte(ResponseNullValue)
+ rbf.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+
+ d.EncodeDubboRsp(resp, wbf)
+ d.DecodeDubboRspBody(rbf, resp)
+
+ // case ResponseWithException
+ resp.SetStatus(Ok)
+ resp.SetEvent(false)
+ buffer.WriteIndex(0)
+ buffer.WriteByte(ResponseWithException)
+
+ rbf.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+
+ d.EncodeDubboRsp(resp, wbf)
+ d.DecodeDubboRspBody(rbf, resp)
+ })
+
+ // ResponseNullValue
+ t.Run("Test ResponseNullValue", func(t *testing.T) {
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+
+ buffer.WriteByte(ResponseNullValue)
+ rbf.SetBuffer(buffer.GetValidData())
+ d.DecodeDubboRspBody(rbf, resp)
+
+ // ResponseValue
+ buffer.WriteByte(ResponseValue)
+ buffer.WriteObject("Hello")
+ rbf.SetBuffer(buffer.GetValidData())
+ d.DecodeDubboRspBody(rbf, resp)
+
+ // ResponseWithException
+ buffer.WriteByte(ResponseWithException)
+ rbf.SetBuffer(buffer.GetValidData())
+ d.DecodeDubboRspBody(rbf, resp)
+ })
+
+ // case DecodeDubboReqBodyForRegstry
+ t.Run("Test DecodeDubboReqBody", func(t *testing.T) {
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+
+ rbf := &util.ReadBuffer{}
+
+ req.SetAttachment(DubboVersionKey, "dubbov1")
+ req.SetAttachment(PathKey, "rest")
+ req.SetAttachment(VersionKey, "1.0.0")
+ req.SetVersion(req.GetAttachment(VersionKey, ""))
+ req.SetMethodName("POST")
+
+ buffer.WriteIndex(0)
+
+ buffer.WriteObject("dubbov1")
+ buffer.WriteObject("rest")
+ buffer.WriteObject("1.0.0")
+ buffer.WriteObject("1.0.0")
+
+ buffer.WriteIndex(buffer.WrittenBytes())
+
+ rbf.SetBuffer(buffer.GetValidData()[:])
+ d.DecodeDubboReqBody(req, rbf)
+
+ // case IsHeartbeat
+ req.SetEvent("")
+ buffer.WriteObject("Hello")
+ d.DecodeDubboReqBody(req, rbf)
+
+ rbf.SetBuffer([]byte{0x34, 0x02}) // tag not found
+ d.DecodeDubboReqBody(req, rbf)
+
+ //case IsEvent
+ req.SetEvent("envent")
+ buffer.WriteObject("Hello")
+ d.DecodeDubboReqBody(req, rbf)
+
+ rbf.SetBuffer([]byte{0x34, 0x02}) // tag not found
+ d.DecodeDubboReqBody(req, rbf)
+ })
+
+ t.Run("Test DecodeDubboReqBodyForRegstry", func(t *testing.T) {
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+ req := NewDubboRequest()
+
+ rbf := &util.ReadBuffer{}
+
+ req.SetAttachment(DubboVersionKey, "dubbov1")
+ req.SetAttachment(PathKey, "rest")
+ req.SetAttachment(VersionKey, "1.0.0")
+ req.SetVersion(req.GetAttachment(VersionKey, ""))
+ req.SetMethodName("POST")
+
+ buffer.WriteIndex(0)
+ buffer.WriteObject("dubbov1")
+ buffer.WriteObject("rest")
+ buffer.WriteObject("1.0.0")
+ buffer.WriteObject("1.0.0")
+
+ //处理参数
+ dubboArgs := make([]util.Argument, 2)
+ for i := 0; i < 2; i++ {
+ arg := &util.Argument{}
+ bytesTmp := util.S2ByteSlice([]string{"v1"})
+ arg.Value, err = util.RestBytesToLstValue("string", bytesTmp)
+ arg.JavaType = "Ljava/lang/String;"
+ dubboArgs[i] = *arg
+ }
+ req.SetArguments(dubboArgs)
+
+ //buffer.WriteObject(req.GetAttachments())
+ s := util.GetJavaDesc(dubboArgs)
+ buffer.WriteObject(s)
+
+ rbf.SetBuffer(buffer.GetValidData()[:])
+ d.DecodeDubboReqBodyForRegstry(req, rbf)
+
+ // case IsHeartbeat
+ req.SetEvent("")
+ buffer.WriteObject("Hello")
+ d.DecodeDubboReqBodyForRegstry(req, rbf)
+
+ rbf.SetBuffer([]byte{0x34, 0x02}) // tag not found
+ d.DecodeDubboReqBodyForRegstry(req, rbf)
+
+ //case IsEvent
+ req.SetEvent("envent")
+ buffer.WriteObject("Hello")
+ d.DecodeDubboReqBodyForRegstry(req, rbf)
+
+ rbf.SetBuffer([]byte{0x34, 0x02}) // tag not found
+ d.DecodeDubboReqBodyForRegstry(req, rbf)
+
+ })
+
+ t.Run("Test DecodeDubboReqBodyForRegstry", func(t *testing.T) {
+ var buffer util.WriteBuffer
+ buffer.Init(0)
+ d.EncodeDubboReq(req, &buffer)
+
+ // case IsHeartbeat
+ buffer.WriteIndex(0)
+ req.SetEvent("")
+ d.EncodeDubboReq(req, &buffer)
+
+ //case IsEvent
+ buffer.WriteIndex(0)
+ req.SetEvent("envent")
+ d.EncodeDubboReq(req, &buffer)
+
+ })
+
+ t.Run("Test DecodeDubboReqHead and DecodeDubboRsqHead", func(t *testing.T) {
+ headBuf := make([]byte, HeaderLength)
+ bodyLen := 0
+
+ // case DecodeDubboReqHead
+ d.DecodeDubboReqHead(req, headBuf, &bodyLen)
+ bodyLen = 0
+ // case DecodeDubboRsqHead
+ d.DecodeDubboRsqHead(resp, headBuf, &bodyLen)
+
+ // init for other test case
+ bodyLen = 0
+ util.Short2bytes(Magic, headBuf, 0)
+
+ // case DecodeDubboReqHead
+ d.DecodeDubboReqHead(req, headBuf, &bodyLen)
+ bodyLen = 0
+ // case DecodeDubboRsqHead
+ d.DecodeDubboRsqHead(resp, headBuf, &bodyLen)
+
+ // init for other test case
+ bodyLen = 0
+ util.Short2bytes(Magic, headBuf, 0)
+ headBuf[2] = (byte)(FlagRequest | Hessian2)
+
+ // case DecodeDubboReqHead
+ d.DecodeDubboReqHead(req, headBuf, &bodyLen)
+ bodyLen = 0
+ // case DecodeDubboRsqHead
+ d.DecodeDubboRsqHead(resp, headBuf, &bodyLen)
+ })
// case EncodeDubboRsp
d.EncodeDubboRsp(resp, wbf)
diff --git a/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput.go b/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput.go
index 48bee85..8ac6e97 100755
--- a/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput.go
+++ b/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput.go
@@ -19,12 +19,8 @@ package dubboproxy
import (
"context"
- "encoding/json"
"fmt"
"github.com/apache/servicecomb-mesher/proxy/cmd"
- "net/http"
- "net/url"
-
mesherCommon "github.com/apache/servicecomb-mesher/proxy/common"
mesherRuntime "github.com/apache/servicecomb-mesher/proxy/pkg/runtime"
"github.com/apache/servicecomb-mesher/proxy/protocol"
@@ -33,7 +29,6 @@ import (
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/schema"
"github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
"github.com/apache/servicecomb-mesher/proxy/resolver"
- "github.com/go-chassis/go-chassis/client/rest"
"github.com/go-chassis/go-chassis/core/common"
chassisCommon "github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/handler"
@@ -41,8 +36,6 @@ import (
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/loadbalancer"
"github.com/go-chassis/go-chassis/pkg/runtime"
- stringutil "github.com/go-chassis/go-chassis/pkg/string"
- "github.com/go-chassis/go-chassis/pkg/util/httputil"
"github.com/go-chassis/go-chassis/pkg/util/tags"
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
"github.com/go-mesh/openlogging"
@@ -71,104 +64,6 @@ func (e ProxyError) Error() string {
return e.Message
}
-//ConvertDubboReqToHTTPReq is a method which converts dubbo requesto to http request
-func ConvertDubboReqToHTTPReq(ctx *dubbo.InvokeContext, dubboReq *dubbo.Request) *http.Request {
- restReq := &http.Request{
- URL: &url.URL{},
- Header: make(http.Header),
- }
- args := dubboReq.GetArguments()
- operateID := dubboReq.GetMethodName()
- iName := dubboReq.GetAttachment(dubbo.PathKey, "")
-
- methd := schema.GetMethodByInterface(iName, operateID)
- if methd == nil {
- lager.Logger.Error("GetMethodByInterface failed: Cannot find the method")
- return nil
- }
- ctx.Method = methd
- restReq.Method = methd.Verb
-
- var (
- i = 0
- qureyNum = 0
- paramsStr = "?"
- body = []byte{}
- )
-
- for i = 0; i < len(args); i++ {
- _, in := methd.GetParamNameAndWhere(i)
- paraSchema := methd.GetParamSchema(i)
- v := args[i]
- if in == schema.InBody {
- b, _ := json.Marshal(v.GetValue())
- body = append(body, b...)
- } else {
- var fmtStr string
- var value string
- if paraSchema.Dtype == util.SchemaArray {
- value = util.ArrayToQueryString(paraSchema.Name, v.GetValue())
- fmtStr += value
- } else {
- value, _ = util.ObjectToString(paraSchema.Dtype, v.GetValue()) // (v.GetValue()).(string)
- if qureyNum == 0 {
- fmtStr = fmt.Sprintf("%s=%s", paraSchema.Name, url.QueryEscape(value))
- qureyNum++
- } else {
- fmtStr = fmt.Sprintf("&%s=%s", paraSchema.Name, url.QueryEscape(value))
- }
- }
- paramsStr += fmtStr
- }
- }
- httputil.SetBody(restReq, body)
-
- uri := methd.Path
- if paramsStr != "?" {
- uri += paramsStr
- }
- httputil.SetURI(restReq, uri)
- tmpName := schema.GetSvcNameByInterface(iName)
- if tmpName == "" {
- lager.Logger.Error("GetSvcNameByInterface failed: Cannot find the svc")
- return nil
- }
- restReq.URL.Host = tmpName // must after setURI
- return restReq
-}
-
-//ConvertRestRspToDubboRsp is a function which converts rest response to dubbo response
-func ConvertRestRspToDubboRsp(ctx *dubbo.InvokeContext, resp *http.Response, dubboRsp *dubbo.DubboRsp) {
- var v interface{}
- var err error
- status := resp.StatusCode
- body := httputil.ReadBody(resp)
- if status >= http.StatusBadRequest {
- dubboRsp.SetStatus(dubbo.ServerError)
- if dubboRsp.GetErrorMsg() == "" && body != nil {
- dubboRsp.SetErrorMsg(string(body))
- }
- return
- }
- dubboRsp.SetStatus(dubbo.Ok)
- if body != nil {
- rspSchema := (*(ctx.Method)).GetRspSchema(status)
- if rspSchema != nil {
- v, err = util.RestByteToValue(rspSchema.DType, body)
- if err != nil {
- dubboRsp.SetStatus(dubbo.BadResponse)
- dubboRsp.SetErrorMsg(err.Error())
- } else {
- dubboRsp.SetValue(v)
- }
- } else {
- dubboRsp.SetErrorMsg(string(body))
- dubboRsp.SetStatus(dubbo.ServerError)
- }
- }
-
-}
-
//SetLocalServiceAddress assign invocation endpoint a local service address
// it uses config in cmd or env fi
// if it is empty, then try to use original port from client as local port
@@ -209,42 +104,40 @@ func Handle(ctx *dubbo.InvokeContext) error {
err = SetLocalServiceAddress(inv) //select local service
if err != nil {
openlogging.GetLogger().Warn(err.Error())
+ IsProvider = false
} else {
IsProvider = true
}
var c *handler.Chain
- if inv.Protocol == "dubbo" {
- //发送请求
- //value := ctx.Req.GetAttachment(ProxyTag, "")
- if !IsProvider || inv.MicroServiceName != runtime.ServiceName { //come from proxyedDubboSvc
- ctx.Req.SetAttachment(common.HeaderSourceName, runtime.ServiceName)
- ctx.Req.SetAttachment(ProxyTag, "true")
-
- if mesherRuntime.Role == mesherCommon.RoleSidecar {
- c, err = handler.GetChain(common.Consumer, mesherCommon.ChainConsumerOutgoing)
- if err != nil {
- openlogging.Error("Get Consumer chain failed: " + err.Error())
- return err
- }
- }
- c.Next(inv, func(ir *invocation.Response) error {
- return handleDubboRequest(inv, ctx, ir)
- })
- } else { //come from other mesher
- ctx.Req.SetAttachment(ProxyTag, "")
- c, err = handler.GetChain(common.Provider, mesherCommon.ChainProviderIncoming)
+ //发送请求
+ //value := ctx.Req.GetAttachment(ProxyTag, "")
+ if !IsProvider || inv.MicroServiceName != runtime.ServiceName { //come from proxyedDubboSvc
+ ctx.Req.SetAttachment(common.HeaderSourceName, runtime.ServiceName)
+ ctx.Req.SetAttachment(ProxyTag, "true")
+
+ if mesherRuntime.Role == mesherCommon.RoleSidecar {
+ c, err = handler.GetChain(common.Consumer, mesherCommon.ChainConsumerOutgoing)
if err != nil {
- openlogging.Error("Get Provider Chain failed: " + err.Error())
+ openlogging.Error("Get Consumer chain failed: " + err.Error())
return err
}
- c.Next(inv, func(ir *invocation.Response) error {
- return handleDubboRequest(inv, ctx, ir)
- })
}
- } else {
- return ProxyRestHandler(ctx)
+ c.Next(inv, func(ir *invocation.Response) error {
+ return handleDubboRequest(inv, ctx, ir)
+ })
+ } else { //come from other mesher
+ ctx.Req.SetAttachment(ProxyTag, "")
+ c, err = handler.GetChain(common.Provider, mesherCommon.ChainProviderIncoming)
+ if err != nil {
+ openlogging.Error("Get Provider Chain failed: " + err.Error())
+ return err
+ }
+ c.Next(inv, func(ir *invocation.Response) error {
+ return handleDubboRequest(inv, ctx, ir)
+ })
}
+
return nil
}
@@ -284,105 +177,3 @@ func handleDubboRequest(inv *invocation.Invocation, ctx *dubbo.InvokeContext, ir
return nil
}
-
-func preHandleToRest(ctx *dubbo.InvokeContext) (*http.Request, *invocation.Invocation, string) {
- restReq := ConvertDubboReqToHTTPReq(ctx, ctx.Req)
- if restReq == nil {
- return nil, nil, ""
- }
- inv := new(invocation.Invocation)
- inv.SourceServiceID = runtime.ServiceID
- inv.Args = restReq
- inv.Protocol = "rest"
- inv.Reply = rest.NewResponse()
- inv.URLPathFormat = restReq.URL.String()
- inv.SchemaID = ""
- inv.OperationID = ""
- inv.Ctx = context.Background()
- err := SetLocalServiceAddress(inv) //select local service
- if err != nil {
- openlogging.Error(err.Error())
- }
- source := stringutil.SplitFirstSep(ctx.RemoteAddr, ":")
- return restReq, inv, source
-}
-
-//ProxyRestHandler is a function
-func ProxyRestHandler(ctx *dubbo.InvokeContext) error {
- var err error
- var c *handler.Chain
-
- req, inv, source := preHandleToRest(ctx)
- if req == nil {
- return &util.BaseError{ErrMsg: "request is invalid "}
- }
-
- source = "127.0.0.1" //"10.57.75.87"
- //Resolve Source
- si := sr.Resolve(source)
- h := make(map[string]string)
- for k := range req.Header {
- h[k] = req.Header.Get(k)
- }
- //Resolve Destination
- destination, _, err := dr.Resolve(source, "", inv.URLPathFormat, h)
- if err != nil {
- return err
- }
- inv.MicroServiceName = destination
- if mesherRuntime.Role == mesherCommon.RoleSidecar {
- c, err = handler.GetChain(common.Consumer, mesherCommon.ChainConsumerOutgoing)
- if err != nil {
- lager.Logger.Error("Get chain failed: " + err.Error())
- return err
- }
- if si == nil {
- lager.Logger.Info("Can not resolve " + source + " to Source info")
- }
- }
-
- c.Next(inv, func(ir *invocation.Response) error {
- //Send the request to the destination
- return handleRequest(ctx, req, inv.Reply.(*http.Response), ctx.Rsp, inv, ir)
- })
- ConvertRestRspToDubboRsp(ctx, inv.Reply.(*http.Response), ctx.Rsp)
- return nil
-}
-
-func handleRequest(ctx *dubbo.InvokeContext, req *http.Request, resp *http.Response,
- dubboRsp *dubbo.DubboRsp, inv *invocation.Invocation, ir *invocation.Response) error {
- if ir != nil {
- if ir.Err != nil {
- switch ir.Err.(type) {
- case hystrix.FallbackNullError:
- resp.StatusCode = http.StatusOK
- dubboRsp.SetErrorMsg(ir.Err.Error())
- case hystrix.CircuitError:
- ir.Status = http.StatusServiceUnavailable
- resp.StatusCode = http.StatusServiceUnavailable
- dubboRsp.SetErrorMsg(ir.Err.Error())
- case loadbalancer.LBError:
- ir.Status = http.StatusBadGateway
- resp.StatusCode = http.StatusBadGateway
- dubboRsp.SetErrorMsg(ir.Err.Error())
- default:
- ir.Status = http.StatusInternalServerError
- resp.StatusCode = http.StatusInternalServerError
- dubboRsp.SetErrorMsg(ir.Err.Error())
- }
- return ir.Err
- }
- if inv.Endpoint == "" {
- ir.Status = http.StatusInternalServerError
- resp.StatusCode = http.StatusInternalServerError
- dubboRsp.SetErrorMsg(ir.Err.Error())
- return protocol.ErrUnknown
- }
- } else {
- dubboRsp.SetErrorMsg(protocol.ErrUnExpectedHandlerChainResponse.Error())
- return protocol.ErrUnExpectedHandlerChainResponse
- }
-
- ir.Status = resp.StatusCode
- return nil
-}
diff --git a/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput_test.go b/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput_test.go
new file mode 100644
index 0000000..9962491
--- /dev/null
+++ b/proxy/protocol/dubbo/proxy/dubbo_proxy_ouput_test.go
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubboproxy
+
+import (
+ "fmt"
+ "github.com/apache/servicecomb-mesher/proxy/bootstrap"
+ "github.com/apache/servicecomb-mesher/proxy/cmd"
+ "github.com/apache/servicecomb-mesher/proxy/common"
+ mesherCommon "github.com/apache/servicecomb-mesher/proxy/common"
+ mesherRuntime "github.com/apache/servicecomb-mesher/proxy/pkg/runtime"
+ dubboclient "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/client"
+ "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
+ "github.com/go-chassis/go-chassis"
+ chassisCommon "github.com/go-chassis/go-chassis/core/common"
+ "github.com/go-chassis/go-chassis/core/config"
+ "github.com/go-chassis/go-chassis/core/config/model"
+ "github.com/go-chassis/go-chassis/core/handler"
+ "github.com/go-chassis/go-chassis/core/invocation"
+ "github.com/go-chassis/go-chassis/core/lager"
+ "github.com/go-chassis/go-chassis/core/loadbalancer"
+ "github.com/go-chassis/go-chassis/core/registry"
+ "github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "os"
+ "strings"
+ "testing"
+
+ // rate limiter handler
+ _ "github.com/go-chassis/go-chassis/middleware/ratelimiter"
+)
+
+func init() {
+ lager.Init(&lager.Options{LoggerLevel: "DEBUG"})
+}
+
+func TestSetLocalServiceAddress(t *testing.T) {
+ t.Run("Run not set env EnvServicePorts", func(t *testing.T) {
+ inv := &invocation.Invocation{Protocol: "rest"}
+ cmd.Init()
+ _ = cmd.Configs.GeneratePortsMap()
+ t.Log(cmd.Configs.PortsMap)
+ err := SetLocalServiceAddress(inv)
+ assert.Error(t, err)
+
+ // case port
+ inv.Port = "8080"
+ err = SetLocalServiceAddress(inv)
+ assert.NoError(t, err)
+ })
+
+ t.Run("Run with env EnvServicePorts", func(t *testing.T) {
+ inv := &invocation.Invocation{Protocol: "rest"}
+ os.Setenv(common.EnvServicePorts, "rest:8080,grpc:90")
+ cmd.Init()
+ _ = cmd.Configs.GeneratePortsMap()
+ t.Log(cmd.Configs.PortsMap)
+ err := SetLocalServiceAddress(inv)
+ assert.NoError(t, err)
+ })
+}
+
+func TestHandle(t *testing.T) {
+ t.Run("Run as Provider", func(t *testing.T) {
+ os.Setenv(common.EnvServicePorts, "dubbo:8081,rest:8080")
+ cmd.Init()
+ _ = cmd.Configs.GeneratePortsMap()
+
+ protoMap := make(map[string]model.Protocol)
+ config.GlobalDefinition = &model.GlobalCfg{
+ Cse: model.CseStruct{
+ Protocols: protoMap,
+ },
+ }
+
+ bootstrap.RegisterFramework()
+ bootstrap.SetHandlers()
+ chassis.Init()
+
+ consumerChain := strings.Join([]string{
+ handler.Router,
+ //"ratelimiter-consumer",
+ //"bizkeeper-consumer",
+ //handler.Loadbalance,
+ //handler.Transport,
+ }, ",")
+ providerChain := strings.Join([]string{
+ //handler.RateLimiterProvider,
+ //handler.Transport,
+ }, ",")
+ consumerChainMap := map[string]string{
+ common.ChainConsumerOutgoing: consumerChain,
+ }
+ providerChainMap := map[string]string{
+ common.ChainProviderIncoming: providerChain,
+ "default": handler.RateLimiterProvider,
+ }
+
+ registry.DefaultContractDiscoveryService = new(MockContractDiscoveryService)
+ mesherRuntime.Role = mesherCommon.RoleSidecar
+
+ req := dubbo.NewDubboRequest()
+ req.SetAttachment(dubbo.PathKey, "hello")
+ ctx := &dubbo.InvokeContext{req, &dubbo.DubboRsp{}, nil, "", "127.0.0.1:9090"}
+ ctx.Rsp.Init()
+
+ // case get chain error
+ err := Handle(ctx)
+ assert.Error(t, err)
+
+ // case get chain ok
+ handler.CreateChains(chassisCommon.Provider, providerChainMap)
+ handler.CreateChains(chassisCommon.Consumer, consumerChainMap)
+ err = Handle(ctx)
+ assert.NoError(t, err)
+
+ handler.ChainMap = make(map[string]*handler.Chain)
+ })
+
+ t.Run("Run as Consumer", func(t *testing.T) {
+ //os.Setenv(common.EnvServicePorts, "rest:8080,grpc:90")
+ os.Unsetenv(common.EnvServicePorts)
+ cmd.Init()
+ _ = cmd.Configs.GeneratePortsMap()
+
+ protoMap := make(map[string]model.Protocol)
+ config.GlobalDefinition = &model.GlobalCfg{
+ Cse: model.CseStruct{
+ Protocols: protoMap,
+ },
+ }
+
+ bootstrap.RegisterFramework()
+ bootstrap.SetHandlers()
+ chassis.Init()
+
+ consumerChain := strings.Join([]string{
+ handler.Router,
+ //"ratelimiter-consumer",
+ //"bizkeeper-consumer",
+ //handler.Loadbalance,
+ //handler.Transport,
+ }, ",")
+ providerChain := strings.Join([]string{
+ //handler.RateLimiterProvider,
+ //handler.Transport,
+ }, ",")
+ consumerChainMap := map[string]string{
+ common.ChainConsumerOutgoing: consumerChain,
+ }
+ providerChainMap := map[string]string{
+ common.ChainProviderIncoming: providerChain,
+ "default": handler.RateLimiterProvider,
+ }
+
+ registry.DefaultContractDiscoveryService = new(MockContractDiscoveryService)
+ mesherRuntime.Role = mesherCommon.RoleSidecar
+
+ req := dubbo.NewDubboRequest()
+ req.SetAttachment(dubbo.PathKey, "hello")
+ ctx := &dubbo.InvokeContext{req, &dubbo.DubboRsp{}, nil, "", "127.0.0.1:9090"}
+ ctx.Rsp.Init()
+
+ // case get chain error
+ err := Handle(ctx)
+ assert.Error(t, err)
+
+ // case get chain ok
+ handler.CreateChains(chassisCommon.Provider, providerChainMap)
+ handler.CreateChains(chassisCommon.Consumer, consumerChainMap)
+ err = Handle(ctx)
+ assert.NoError(t, err)
+
+ handler.ChainMap = make(map[string]*handler.Chain)
+ })
+}
+
+func Test_handleDubboRequest(t *testing.T) {
+ req := dubbo.NewDubboRequest()
+ req.SetAttachment(dubbo.PathKey, "hello")
+ ctx := &dubbo.InvokeContext{req, &dubbo.DubboRsp{}, nil, "", "127.0.0.1:9090"}
+ ctx.Rsp.Init()
+
+ inv := &invocation.Invocation{Protocol: "rest"}
+ ir := &invocation.Response{}
+ inv.Endpoint = "127.0.0.1:8080"
+
+ // case responese ir.Result = nil
+ handleDubboRequest(inv, ctx, ir)
+
+ // case responese ir.Result != nil
+ ir.Result = &dubboclient.WrapResponse{Resp: &dubbo.DubboRsp{}}
+ handleDubboRequest(inv, ctx, ir)
+
+ // Case ir.Err == hystrix.FallbackNullError
+ ir.Err = hystrix.FallbackNullError{"Error."}
+ handleDubboRequest(inv, ctx, ir)
+ // Case ir.Err == hystrix.CircuitError:
+ ir.Err = hystrix.CircuitError{"Error."}
+ handleDubboRequest(inv, ctx, ir)
+ // Case ir.Err == loadbalancer.LBError
+ ir.Err = loadbalancer.LBError{"Error."}
+ handleDubboRequest(inv, ctx, ir)
+ // Case ir.Err == other
+ ir.Err = fmt.Errorf("Other error.")
+ handleDubboRequest(inv, ctx, ir)
+
+ // case ir == nil
+ handleDubboRequest(inv, ctx, nil)
+
+}
+
+// ContractDiscoveryService struct for disco mock
+type MockContractDiscoveryService struct {
+ mock.Mock
+}
+
+func (m *MockContractDiscoveryService) GetMicroServicesByInterface(interfaceName string) (microservices []*registry.MicroService) {
+ microservices = append(microservices, ®istry.MicroService{})
+ return
+}
+
+func (m *MockContractDiscoveryService) GetSchemaContentByInterface(interfaceName string) registry.SchemaContent {
+ return registry.SchemaContent{}
+}
+
+func (m *MockContractDiscoveryService) GetSchemaContentByServiceName(svcName, version, appID, env string) []*registry.SchemaContent {
+ var sc []*registry.SchemaContent
+ sc = append(sc, ®istry.SchemaContent{
+ Paths: map[string]map[string]registry.MethodInfo{
+ "hello": {},
+ },
+ })
+ return nil
+}
+
+func (m *MockContractDiscoveryService) Close() error {
+ return nil
+}
diff --git a/proxy/protocol/dubbo/proxy/rest2dubbo.go b/proxy/protocol/dubbo/proxy/rest2dubbo.go
deleted file mode 100755
index ade84d9..0000000
--- a/proxy/protocol/dubbo/proxy/rest2dubbo.go
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dubboproxy
-
-import (
- "fmt"
- "github.com/go-chassis/go-chassis/pkg/runtime"
- "io/ioutil"
- "net/http"
- "net/url"
- "strings"
-
- mesherCommon "github.com/apache/servicecomb-mesher/proxy/common"
- "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/client"
- "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/dubbo"
- "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/schema"
- "github.com/apache/servicecomb-mesher/proxy/protocol/dubbo/utils"
-
- "github.com/apache/servicecomb-mesher/proxy/protocol"
- "github.com/go-chassis/go-chassis/core/common"
- "github.com/go-chassis/go-chassis/core/handler"
- "github.com/go-chassis/go-chassis/core/invocation"
- "github.com/go-chassis/go-chassis/core/loadbalancer"
- "github.com/go-chassis/go-chassis/pkg/string"
- "github.com/go-chassis/go-chassis/pkg/util/tags"
- "github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
- "github.com/go-mesh/openlogging"
-)
-
-//ConvertDubboRspToRestRsp is a function which converts dubbo response to rest response
-func ConvertDubboRspToRestRsp(dubboRsp *dubbo.DubboRsp, w http.ResponseWriter, ctx *dubbo.InvokeContext) error {
- status := dubboRsp.GetStatus()
- if status == dubbo.Ok {
- w.WriteHeader(http.StatusOK)
- rspSchema := (*(ctx.Method)).GetRspSchema(http.StatusOK)
- if rspSchema != nil {
- v, err := util.ObjectToString(rspSchema.DType, dubboRsp.GetValue())
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- } else {
- if _, err := w.Write([]byte(v)); err != nil {
- return err
- }
- }
- } else {
- w.WriteHeader(http.StatusInternalServerError)
- }
- } else {
- w.WriteHeader(http.StatusInternalServerError)
- }
- return nil
-}
-
-//ConvertHTTPReqToDubboReq is a function which converts http request in to dubbo request
-func ConvertHTTPReqToDubboReq(restReq *http.Request, ctx *dubbo.InvokeContext, inv *invocation.Invocation) error {
- req := ctx.Req
- uri := restReq.URL
- i := 0
- var dubboArgs []util.Argument
- queryAgrs := uri.Query()
- arg := &util.Argument{}
-
- svcSchema, methd := schema.GetSchemaMethodBySvcURL(inv.MicroServiceName, "", inv.RouteTags.Version(), inv.RouteTags.AppID(),
- strings.ToLower(restReq.Method), string(restReq.URL.String()))
- if methd == nil {
- return &util.BaseError{"Method not been found"}
- }
- req.SetMethodName(methd.OperaID)
- req.SetAttachment(dubbo.DubboVersionKey, dubbo.DubboVersion)
- req.SetAttachment(dubbo.PathKey, svcSchema.Info["x-java-interface"]) //interfaceSchema.JavaClsName
- req.SetAttachment(dubbo.VersionKey, "0.0.0")
- ctx.Method = methd
- var err error
-
- //处理参数
- dubboArgs = make([]util.Argument, len(methd.Paras))
-
- for _, v := range methd.Paras {
- var byteTmp []byte
- var bytesTmp [][]byte
- itemType := "string" //默认为string
- if strings.EqualFold(v.Where, "query") {
- byteTmp = []byte(queryAgrs.Get(v.Name))
- } else if restReq.Body != nil {
- byteTmp, _ = ioutil.ReadAll(restReq.Body)
- }
- if byteTmp == nil && v.Required {
- return &util.BaseError{"Param is null"}
- }
- var realJvmType string
- bytesTmp, realJvmType = getJVMType(v, arg, bytesTmp, restReq.URL)
- if bytesTmp == nil {
- arg.Value, err = util.RestByteToValue(arg.JavaType, byteTmp)
- if err != nil {
- return err
- }
- } else {
- arg.Value, err = util.RestBytesToLstValue(itemType, bytesTmp)
- if err != nil {
- return err
- }
- }
-
- if realJvmType != "" {
- arg.JavaType = realJvmType
- }
- dubboArgs[i] = *arg
- i++
- }
-
- req.SetArguments(dubboArgs)
-
- return nil
-}
-
-func getJVMType(v schema.MethParam, arg *util.Argument, bytesTmp [][]byte, queryAgrs *url.URL) ([][]byte, string) {
- var realJvmType string
- queryAgrsTmp := queryAgrs.Query()
- if _, ok := util.SchemeTypeMAP[v.Dtype]; ok {
- arg.JavaType = util.SchemeTypeMAP[v.Dtype]
- if v.Dtype == util.SchemaArray {
- realJvmType = util.JavaList
- if v.Items != nil {
- if val, ok := v.Items["x-java-class"]; ok {
- realJvmType = fmt.Sprintf("L%s;", val)
- }
- if valType, ok := v.Items["type"]; ok {
- realJvmType = fmt.Sprintf("L%s;", valType)
- }
- }
- bytesTmp = util.S2ByteSlice(queryAgrsTmp[v.Name])
- } else if arg.JavaType == util.JavaObject {
- realJvmType = fmt.Sprintf("L%s;", v.ObjRef.JvmClsName)
- if v.AdditionalProps != nil { //处理map
- if val, ok := v.AdditionalProps["x-java-class"]; ok {
- realJvmType = fmt.Sprintf("L%s;", val)
- } else {
- realJvmType = util.JavaMap
- }
- }
- }
- //Lcom.alibaba.dubbo.demo.user; need convert to Lcom/alibaba/dubbo/demo/User;
- realJvmType = strings.Replace(realJvmType, ".", "/", -1)
- }
- return bytesTmp, realJvmType
-}
-
-func preHandleToDubbo(req *http.Request) (*invocation.Invocation, string) {
- inv := new(invocation.Invocation)
- inv.MicroServiceName = runtime.ServiceName
- inv.RouteTags = utiltags.NewDefaultTag(runtime.Version, runtime.App)
-
- inv.Protocol = "dubbo"
- inv.URLPathFormat = req.URL.Path
- inv.Reply = &dubboclient.WrapResponse{nil}
- source := stringutil.SplitFirstSep(req.RemoteAddr, ":")
- return inv, source
-}
-
-//TransparentForwardHandler is a function
-func TransparentForwardHandler(w http.ResponseWriter, r *http.Request) {
- inv, _ := preHandleToDubbo(r)
- dubboCtx := &dubbo.InvokeContext{dubbo.NewDubboRequest(), &dubbo.DubboRsp{}, nil, "", ""}
- err := ConvertHTTPReqToDubboReq(r, dubboCtx, inv)
- if err != nil {
- openlogging.Error("Invalid Request: " + err.Error())
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- inv.Args = dubboCtx.Req
-
- c, err := handler.GetChain(common.Provider, mesherCommon.ChainProviderIncoming)
- if err != nil {
- openlogging.Error("Get Chain failed: " + err.Error())
- return
- }
- c.Next(inv, func(ir *invocation.Response) error {
- return handleRequestForDubbo(w, inv, ir)
- })
- dubboRsp := inv.Reply.(*dubboclient.WrapResponse).Resp
- if dubboRsp != nil {
- err := ConvertDubboRspToRestRsp(dubboRsp, w, dubboCtx)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- w.Write(stringutil.Str2bytes(err.Error()))
- }
- }
-}
-
-func handleRequestForDubbo(w http.ResponseWriter, inv *invocation.Invocation, ir *invocation.Response) error {
- if ir != nil {
- var err error
- if ir.Err != nil {
- switch ir.Err.(type) {
- case hystrix.FallbackNullError:
- w.WriteHeader(http.StatusOK)
- ir.Status = http.StatusOK
- case hystrix.CircuitError:
- w.WriteHeader(http.StatusServiceUnavailable)
- ir.Status = http.StatusServiceUnavailable
- _, err = w.Write([]byte(ir.Err.Error()))
- case loadbalancer.LBError:
- w.WriteHeader(http.StatusBadGateway)
- ir.Status = http.StatusBadGateway
- _, err = w.Write([]byte(ir.Err.Error()))
- default:
- w.WriteHeader(http.StatusInternalServerError)
- ir.Status = http.StatusInternalServerError
- _, err = w.Write([]byte(ir.Err.Error()))
- }
- if err != nil {
- return err
- }
-
- return ir.Err
- }
- if inv.Endpoint == "" {
- w.WriteHeader(http.StatusInternalServerError)
- ir.Status = http.StatusInternalServerError
- _, err = w.Write([]byte(protocol.ErrUnknown.Error()))
- if err != nil {
- return err
- }
- return protocol.ErrUnknown
- }
- } else {
- w.WriteHeader(http.StatusInternalServerError)
- _, err := w.Write([]byte(protocol.ErrUnExpectedHandlerChainResponse.Error()))
- if err != nil {
- return err
- }
- return protocol.ErrUnExpectedHandlerChainResponse
- }
-
- return nil
-}
diff --git a/proxy/protocol/dubbo/utils/buffer.go b/proxy/protocol/dubbo/utils/buffer.go
index 5617266..ef69af5 100644
--- a/proxy/protocol/dubbo/utils/buffer.go
+++ b/proxy/protocol/dubbo/utils/buffer.go
@@ -162,13 +162,14 @@ func (b *ReadBuffer) GetBuf() []byte {
}
//ReadByte is a method to read particular byte from buffer
-func (b *ReadBuffer) ReadByte() byte {
+func (b *ReadBuffer) ReadByte() (byte, error) {
var tmp interface{}
tmp, err := b.ReadObject()
if err != nil {
openlogging.Error(err.Error())
+ return byte(0), err
}
- return byte(tmp.(int32))
+ return byte(tmp.(int32)), nil
}
//ReadBytes is a method to read data from buffer
diff --git a/proxy/protocol/dubbo/utils/buffer_test.go b/proxy/protocol/dubbo/utils/buffer_test.go
index 4be9347..693c73a 100644
--- a/proxy/protocol/dubbo/utils/buffer_test.go
+++ b/proxy/protocol/dubbo/utils/buffer_test.go
@@ -68,13 +68,15 @@ func TestReadBuffe(t *testing.T) {
var buffer WriteBuffer
buffer.Init(DefaultBufferSize)
+ var readBuffer ReadBuffer
+
// Write byte
err := buffer.WriteByte(byte(12))
assert.NoError(t, err)
- var readBuffer ReadBuffer
readBuffer.SetBuffer(buffer.GetBuf())
- b := readBuffer.ReadByte()
+ b, err := readBuffer.ReadByte()
+ assert.NoError(t, err)
assert.Equal(t, byte(12), b)
// Write Object
@@ -97,4 +99,60 @@ func TestReadBuffe(t *testing.T) {
str := readBuffer.ReadString()
assert.Equal(t, "string01", str)
+
+ // case read byte error
+ buffer.WriteIndex(0)
+ readBuffer.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+
+ b, err = readBuffer.ReadByte()
+ assert.Error(t, err)
+
+ buffer.Init(5)
+ // case test GetValidData()
+ buffer.GetValidData()
+
+ // case buffer.WriteIndex(0)
+ buffer.WriteIndex(buffer.capacity + 1)
+
+ //case b.capacity < (b.wrInd + size
+ buffer.WriteIndex(0)
+ badBuf := make([]byte, DefaultGrowSize+1)
+ buffer.WriteBytes(badBuf)
+
+ //case Not enough space to write
+ buffer.Init(0)
+ //buffer.WriteIndex(0)
+ badBuf = make([]byte, 10)
+ buffer.Write(badBuf)
+
+ // case GetBuf
+ readBuffer.GetBuf()
+
+ buffer.WriteIndex(0)
+ readBuffer.SetBuffer(append(buffer.GetBuf()[:buffer.WrittenBytes()], []byte{0x34, 0x02}...))
+
+ str = readBuffer.ReadString()
+ assert.Equal(t, "", str)
+
+ // Read map error
+ buffer.WriteIndex(0)
+ readBuffer.SetBuffer([]byte{0x34, 0x02})
+ _, err = readBuffer.ReadMap()
+ assert.Equal(t, "", str)
+
+ tmpBuf := make([]byte, 5)
+ readBuffer.SetBuffer([]byte{0x34, 0x02})
+ _, err = readBuffer.Read(tmpBuf)
+ assert.Equal(t, "", str)
+
+ tmpBuf = make([]byte, 2)
+ readBuffer.SetBuffer([]byte{0x34, 0x02})
+ readBuffer.rdInd = 2
+ _, err = readBuffer.Read(tmpBuf)
+ assert.Equal(t, "", str)
+
+ // Test Base Error
+ baseErr := BaseError{"BaseError"}
+ assert.Equal(t, "BaseError", baseErr.Error())
+
}
diff --git a/proxy/protocol/dubbo/utils/msgqueue_test.go b/proxy/protocol/dubbo/utils/msgqueue_test.go
index 50bd428..2cac3e2 100644
--- a/proxy/protocol/dubbo/utils/msgqueue_test.go
+++ b/proxy/protocol/dubbo/utils/msgqueue_test.go
@@ -26,7 +26,7 @@ import (
)
func NewMsgQueueForTest(maxMsgNum int) *MsgQueue {
- q := new(MsgQueue)
+ q := NewMsgQueue()
q.msgList = list.New()
q.mtx = new(sync.Mutex)
q.msgCount = 0
@@ -39,7 +39,7 @@ func NewMsgQueueForTest(maxMsgNum int) *MsgQueue {
func TestMsgQueue(t *testing.T) {
t.Run("case empty", func(t *testing.T) {
- q := NewMsgQueueForTest(2)
+ q := NewMsgQueueForTest(3)
// append msg
eMSG := "msg to send"
@@ -56,23 +56,34 @@ func TestMsgQueue(t *testing.T) {
// case empty
done := make(chan struct{})
- go func(done chan struct{}) {
- dMSG, err = q.Dequeue()
- assert.NoError(t, err)
-
- // Deavtive
- q.Deavtive()
+ go func() {
+ ticker := time.NewTimer(time.Second)
+ for range ticker.C {
+ err = q.Enqueue(eMSG)
+ assert.NoError(t, err)
+ }
- // error
- _, err = q.Dequeue()
- assert.Error(t, err)
+ }()
- if done != nil {
- close(done)
+ go func(done chan struct{}) {
+ ticker := time.NewTimer(time.Second * 6)
+ for range ticker.C {
+ q.Deavtive()
+ if done != nil {
+ close(done)
+ }
}
}(done)
+ go func() {
+ for i := 0; i < 20; i++ {
+ q.Dequeue()
+ time.Sleep(time.Second)
+ }
+
+ }()
+
select {
case <-time.After(time.Second * 10):
case <-done:
@@ -92,7 +103,6 @@ func TestMsgQueue(t *testing.T) {
done1 := make(chan struct{})
go func(c chan struct{}) {
err = q1.Enqueue(eMSG)
- t.Log(err)
assert.Error(t, err)
if c != nil {
diff --git a/proxy/protocol/dubbo/utils/thrmgr_test.go b/proxy/protocol/dubbo/utils/thrmgr_test.go
index 8f2a50d..4b28be6 100644
--- a/proxy/protocol/dubbo/utils/thrmgr_test.go
+++ b/proxy/protocol/dubbo/utils/thrmgr_test.go
@@ -34,14 +34,47 @@ func TestNewThreadGroupWait(t *testing.T) {
defer tgw.Done()
count++
}(tgw)
+
tgw.Wait()
close(done)
}(done)
select {
- case <-time.After(time.Second * 2):
+ case <-time.After(time.Second * 5):
case <-done:
}
assert.Equal(t, 1, count)
+ // case done count < 0
+ tgw := NewThreadGroupWait()
+ tgw.Done()
+ tgw.Done()
+}
+
+type Task struct {
+}
+
+func (t *Task) Svc(interface{}) interface{} {
+ return nil
+}
+
+func TestThrmgr(t *testing.T) {
+ nr := NewRoutineManager()
+ var done chan struct{}
+ go func(done chan struct{}) {
+ nr.Wait()
+ close(done)
+ }(done)
+
+ time.AfterFunc(time.Second*2, func() {
+ nr.Done()
+ })
+
+ select {
+ case <-time.After(time.Second * 5):
+ case <-done:
+ }
+
+ nr.Spawn(new(Task), "swap", "routinename")
+
}
diff --git a/proxy/protocol/dubbo/utils/typeutil_test.go b/proxy/protocol/dubbo/utils/typeutil_test.go
index e861e86..b269a4c 100644
--- a/proxy/protocol/dubbo/utils/typeutil_test.go
+++ b/proxy/protocol/dubbo/utils/typeutil_test.go
@@ -18,6 +18,7 @@
package util
import (
+ "encoding/json"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -25,8 +26,13 @@ import (
func TestArrayToQueryString(t *testing.T) {
key := "key_01"
value := []interface{}{"value01", "value02"}
- t.Log(ArrayToQueryString(key, value))
+ s := ArrayToQueryString(key, value)
+ t.Log(s)
+ assert.Equal(t, `key_01=value01&key_01=value02`, s)
+ // case not []interface{} type
+ s = ArrayToQueryString(key, "")
+ assert.Equal(t, "", s)
}
func TestObjectToString(t *testing.T) {
@@ -39,6 +45,10 @@ func TestObjectToString(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "", str)
+ // case Unsurported Type
+ str, err = ObjectToString(JavaChar, "string01")
+ assert.Error(t, err)
+
// case java byte
str, err = ObjectToString(JavaByte, "9")
assert.NoError(t, err)
@@ -113,7 +123,6 @@ func TestRestByteToValue(t *testing.T) {
bytes := make([]byte, 2)
v := int(11)
Short2bytes(v, bytes, 0)
-
str, err = RestByteToValue(JavaShort, bytes)
assert.NoError(t, err)
assert.Equal(t, int16(v), str)
@@ -122,16 +131,100 @@ func TestRestByteToValue(t *testing.T) {
bytes = make([]byte, 4)
v = int(11)
Int2bytes(v, bytes, 0)
-
str, err = RestByteToValue(JavaInteger, bytes)
assert.NoError(t, err)
assert.Equal(t, int32(v), str)
+ // case Java Char byte
+ bytes = make([]byte, 4)
+ bytes[0] = byte(0)
+ bytes[1] = byte(1)
+ bytes[2] = byte(2)
+ bytes[3] = byte(3)
+ str, err = RestByteToValue(JavaChar, bytes)
+ str, err = RestByteToValue(JavaByte, bytes)
+ assert.NoError(t, err)
+ assert.Equal(t, bytes, str)
+
+ // case Java long
+ bytes = make([]byte, 4)
+ bytes[0] = byte(0)
+ bytes[1] = byte(1)
+ bytes[2] = byte(2)
+ bytes[3] = byte(3)
+ str, err = RestByteToValue(JavaLong, bytes)
+ assert.NoError(t, err)
+
+ // case Java float
+ bytes = make([]byte, 4)
+ bytes[0] = byte(0)
+ bytes[1] = byte(1)
+ bytes[2] = byte(2)
+ bytes[3] = byte(3)
+ str, err = RestByteToValue(JavaFloat, bytes)
+ assert.NoError(t, err)
+
+ // case Java double
+ bytes = make([]byte, 8)
+ bytes[0] = byte(0)
+ bytes[1] = byte(1)
+ bytes[2] = byte(2)
+ bytes[3] = byte(3)
+ str, err = RestByteToValue(JavaDouble, bytes)
+ assert.NoError(t, err)
+
+ // case Java boolean
+ bytes = make([]byte, 8)
+ bytes[0] = byte(0)
+ bytes[1] = byte(1)
+ bytes[2] = byte(2)
+ bytes[3] = byte(3)
+ str, err = RestByteToValue(JavaBoolean, bytes)
+ assert.Error(t, err)
+
+ // case Java avaArray, SchemaArray:
+ str, err = RestByteToValue(JavaArray, bytes)
+ assert.Error(t, err)
+ str, err = RestByteToValue(SchemaArray, bytes)
+ assert.Error(t, err)
+
+ // case JavaObject, SchemaObject:
+ str, err = RestByteToValue(JavaObject, bytes)
+ assert.Error(t, err)
+ str, err = RestByteToValue(SchemaObject, bytes)
+ assert.Error(t, err)
+
+ type jsonObj struct {
+ Name string `json:"name"`
+ }
+
+ bs, err := json.Marshal(jsonObj{"name"})
+ assert.NoError(t, err)
+
+ str, err = RestByteToValue(SchemaObject, bs)
+ assert.NoError(t, err)
+ m, ok := str.(map[string]interface{})
+ assert.Equal(t, true, ok)
+ assert.Equal(t, m["name"], "name")
+
+ str, err = RestByteToValue(JavaObject, bs)
+ assert.NoError(t, err)
+ m, ok = str.(map[string]interface{})
+ assert.Equal(t, true, ok)
+ assert.Equal(t, m["name"], "name")
+
+ str, err = RestByteToValue("not fount tag", bs)
+ assert.Error(t, err)
}
func TestTypeDesToArgsObjArry(t *testing.T) {
arg := TypeDesToArgsObjArry(JavaString)
t.Log(arg)
+
+ // case empty
+ arg = TypeDesToArgsObjArry("")
+ t.Log(arg)
+ assert.Equal(t, 1, len(arg))
}
func TestArgumen(t *testing.T) {
@@ -145,3 +238,27 @@ func TestArgumen(t *testing.T) {
assert.Equal(t, 99, arg.GetValue())
}
+
+func TestGetJavaDesc(t *testing.T) {
+ arg := make([]Argument, 1)
+ arg = append(arg, Argument{JavaType: JavaString, Value: "string"})
+ str := GetJavaDesc(arg)
+ assert.Equal(t, JavaString, str)
+}
+
+func TestRestBytesToLstValue(t *testing.T) {
+ arg := &Argument{}
+ var err error
+ bytesTmp := S2ByteSlice([]string{"v1"})
+
+ // case type error
+ arg.Value, err = RestBytesToLstValue("Not a type", bytesTmp)
+ assert.Error(t, err)
+
+ // case array empty
+ arg.Value, err = RestBytesToLstValue(JavaString, make([][]byte, 0))
+ assert.NoError(t, err)
+
+ arg.Value, err = RestBytesToLstValue(JavaString, bytesTmp)
+ assert.NoError(t, err)
+}