You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/06/18 02:02:38 UTC
[apisix-go-plugin-runner] branch master updated: fix: handle err
before out (#9)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git
The following commit(s) were added to refs/heads/master by this push:
new dffe3be fix: handle err before out (#9)
dffe3be is described below
commit dffe3beae93be9ea632d84763c911e66b6130c28
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Fri Jun 18 10:02:30 2021 +0800
fix: handle err before out (#9)
---
internal/server/server.go | 72 +++++++++++++++++++++++++++---------------
internal/server/server_test.go | 29 +++++++++++++++++
pkg/http/http.go | 3 +-
3 files changed, 77 insertions(+), 27 deletions(-)
diff --git a/internal/server/server.go b/internal/server/server.go
index 798d83a..e25083e 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -45,6 +45,11 @@ const (
RPCError = iota
RPCPrepareConf
RPCHTTPReqCall
+ RPCTest = 127 // used only in test
+)
+
+var (
+ dealRPCTest func(buf []byte) (*flatbuffers.Builder, error)
)
func readErr(n int, err error, required int) bool {
@@ -66,6 +71,46 @@ func writeErr(n int, err error) {
}
}
+func generateErrorReport(err error) (ty byte, out []byte) {
+ log.Errorf("%s", err)
+
+ ty = RPCError
+ bd := ReportError(err)
+ out = bd.FinishedBytes()
+ util.PutBuilder(bd)
+ return
+}
+
+func dispatchRPC(ty byte, in []byte) (byte, []byte) {
+ var err error
+ var bd *flatbuffers.Builder
+ switch ty {
+ case RPCPrepareConf:
+ bd, err = plugin.PrepareConf(in)
+ case RPCHTTPReqCall:
+ bd, err = plugin.HTTPReqCall(in)
+ case RPCTest: // Just for test
+ bd, err = dealRPCTest(in)
+ default:
+ err = UnknownType{ty}
+ }
+
+ var out []byte
+ if err != nil {
+ ty, out = generateErrorReport(err)
+ } else {
+ out = bd.FinishedBytes()
+ util.PutBuilder(bd)
+ size := len(out)
+ if size > MaxDataSize {
+ err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
+ ty, out = generateErrorReport(err)
+ }
+ }
+
+ return ty, out
+}
+
func handleConn(c net.Conn) {
defer func() {
if err := recover(); err != nil {
@@ -97,48 +142,23 @@ func handleConn(c net.Conn) {
break
}
- var bd *flatbuffers.Builder
- switch ty {
- case RPCPrepareConf:
- bd, err = plugin.PrepareConf(buf)
- case RPCHTTPReqCall:
- bd, err = plugin.HTTPReqCall(buf)
- default:
- err = UnknownType{ty}
- }
+ ty, out := dispatchRPC(ty, buf)
- out := bd.FinishedBytes()
size := len(out)
- if size > MaxDataSize {
- err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
- }
-
- if err != nil {
- log.Errorf("%s", err)
-
- ty = RPCError
- util.PutBuilder(bd)
- bd = ReportError(err)
- out = bd.FinishedBytes()
- }
-
binary.BigEndian.PutUint32(header, uint32(size))
header[0] = ty
n, err = c.Write(header)
if err != nil {
writeErr(n, err)
- util.PutBuilder(bd)
break
}
n, err = c.Write(out)
if err != nil {
writeErr(n, err)
- util.PutBuilder(bd)
break
}
- util.PutBuilder(bd)
}
}
diff --git a/internal/server/server_test.go b/internal/server/server_test.go
index 497e073..c7285eb 100644
--- a/internal/server/server_test.go
+++ b/internal/server/server_test.go
@@ -23,7 +23,11 @@ import (
"testing"
"time"
+ hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-go-plugin-runner/internal/util"
)
func TestGetSockAddr(t *testing.T) {
@@ -34,6 +38,31 @@ func TestGetSockAddr(t *testing.T) {
assert.Equal(t, "/tmp/x.sock", getSockAddr())
}
+func TestDispatchRPC_UnknownType(t *testing.T) {
+ ty, _ := dispatchRPC(126, []byte(""))
+ assert.Equal(t, byte(RPCError), ty)
+}
+
+func TestDispatchRPC_OutTooLarge(t *testing.T) {
+ dealRPCTest = func(buf []byte) (*flatbuffers.Builder, error) {
+ builder := util.GetBuilder()
+ bodyVec := builder.CreateByteVector(make([]byte, MaxDataSize+1))
+ hrc.StopStart(builder)
+ hrc.StopAddBody(builder, bodyVec)
+ stop := hrc.StopEnd(builder)
+
+ hrc.RespStart(builder)
+ hrc.RespAddId(builder, 1)
+ hrc.RespAddActionType(builder, hrc.ActionStop)
+ hrc.RespAddAction(builder, stop)
+ res := hrc.RespEnd(builder)
+ builder.Finish(res)
+ return builder, nil
+ }
+ ty, _ := dispatchRPC(RPCTest, []byte(""))
+ assert.Equal(t, byte(RPCError), ty)
+}
+
func TestRun(t *testing.T) {
path := "/tmp/x.sock"
addr := "unix:" + path
diff --git a/pkg/http/http.go b/pkg/http/http.go
index d7ca549..cf00132 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -34,7 +34,8 @@ type Request interface {
ID() uint32
// ConfToken returns the token represents the configuration of current route.
// Each route have its unique token, so we can use it to distinguish different
- // route in the same plugin.
+ // route in the same plugin. When the configuration of a route changed, the token
+ // will change too.
ConfToken() uint32
// SrcIP returns the client's IP