You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/06/18 02:02:38 UTC

[apisix-go-plugin-runner] branch master updated: fix: handle err before out (#9)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new dffe3be  fix: handle err before out (#9)
dffe3be is described below

commit dffe3beae93be9ea632d84763c911e66b6130c28
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Fri Jun 18 10:02:30 2021 +0800

    fix: handle err before out (#9)
---
 internal/server/server.go      | 72 +++++++++++++++++++++++++++---------------
 internal/server/server_test.go | 29 +++++++++++++++++
 pkg/http/http.go               |  3 +-
 3 files changed, 77 insertions(+), 27 deletions(-)

diff --git a/internal/server/server.go b/internal/server/server.go
index 798d83a..e25083e 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -45,6 +45,11 @@ const (
 	RPCError = iota
 	RPCPrepareConf
 	RPCHTTPReqCall
+	RPCTest = 127 // used only in test
+)
+
+var (
+	dealRPCTest func(buf []byte) (*flatbuffers.Builder, error)
 )
 
 func readErr(n int, err error, required int) bool {
@@ -66,6 +71,46 @@ func writeErr(n int, err error) {
 	}
 }
 
+func generateErrorReport(err error) (ty byte, out []byte) {
+	log.Errorf("%s", err)
+
+	ty = RPCError
+	bd := ReportError(err)
+	out = bd.FinishedBytes()
+	util.PutBuilder(bd)
+	return
+}
+
+func dispatchRPC(ty byte, in []byte) (byte, []byte) {
+	var err error
+	var bd *flatbuffers.Builder
+	switch ty {
+	case RPCPrepareConf:
+		bd, err = plugin.PrepareConf(in)
+	case RPCHTTPReqCall:
+		bd, err = plugin.HTTPReqCall(in)
+	case RPCTest: // Just for test
+		bd, err = dealRPCTest(in)
+	default:
+		err = UnknownType{ty}
+	}
+
+	var out []byte
+	if err != nil {
+		ty, out = generateErrorReport(err)
+	} else {
+		out = bd.FinishedBytes()
+		util.PutBuilder(bd)
+		size := len(out)
+		if size > MaxDataSize {
+			err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
+			ty, out = generateErrorReport(err)
+		}
+	}
+
+	return ty, out
+}
+
 func handleConn(c net.Conn) {
 	defer func() {
 		if err := recover(); err != nil {
@@ -97,48 +142,23 @@ func handleConn(c net.Conn) {
 			break
 		}
 
-		var bd *flatbuffers.Builder
-		switch ty {
-		case RPCPrepareConf:
-			bd, err = plugin.PrepareConf(buf)
-		case RPCHTTPReqCall:
-			bd, err = plugin.HTTPReqCall(buf)
-		default:
-			err = UnknownType{ty}
-		}
+		ty, out := dispatchRPC(ty, buf)
 
-		out := bd.FinishedBytes()
 		size := len(out)
-		if size > MaxDataSize {
-			err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
-		}
-
-		if err != nil {
-			log.Errorf("%s", err)
-
-			ty = RPCError
-			util.PutBuilder(bd)
-			bd = ReportError(err)
-			out = bd.FinishedBytes()
-		}
-
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = ty
 
 		n, err = c.Write(header)
 		if err != nil {
 			writeErr(n, err)
-			util.PutBuilder(bd)
 			break
 		}
 
 		n, err = c.Write(out)
 		if err != nil {
 			writeErr(n, err)
-			util.PutBuilder(bd)
 			break
 		}
-		util.PutBuilder(bd)
 	}
 }
 
diff --git a/internal/server/server_test.go b/internal/server/server_test.go
index 497e073..c7285eb 100644
--- a/internal/server/server_test.go
+++ b/internal/server/server_test.go
@@ -23,7 +23,11 @@ import (
 	"testing"
 	"time"
 
+	hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
+	flatbuffers "github.com/google/flatbuffers/go"
 	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/apisix-go-plugin-runner/internal/util"
 )
 
 func TestGetSockAddr(t *testing.T) {
@@ -34,6 +38,31 @@ func TestGetSockAddr(t *testing.T) {
 	assert.Equal(t, "/tmp/x.sock", getSockAddr())
 }
 
+func TestDispatchRPC_UnknownType(t *testing.T) {
+	ty, _ := dispatchRPC(126, []byte(""))
+	assert.Equal(t, byte(RPCError), ty)
+}
+
+func TestDispatchRPC_OutTooLarge(t *testing.T) {
+	dealRPCTest = func(buf []byte) (*flatbuffers.Builder, error) {
+		builder := util.GetBuilder()
+		bodyVec := builder.CreateByteVector(make([]byte, MaxDataSize+1))
+		hrc.StopStart(builder)
+		hrc.StopAddBody(builder, bodyVec)
+		stop := hrc.StopEnd(builder)
+
+		hrc.RespStart(builder)
+		hrc.RespAddId(builder, 1)
+		hrc.RespAddActionType(builder, hrc.ActionStop)
+		hrc.RespAddAction(builder, stop)
+		res := hrc.RespEnd(builder)
+		builder.Finish(res)
+		return builder, nil
+	}
+	ty, _ := dispatchRPC(RPCTest, []byte(""))
+	assert.Equal(t, byte(RPCError), ty)
+}
+
 func TestRun(t *testing.T) {
 	path := "/tmp/x.sock"
 	addr := "unix:" + path
diff --git a/pkg/http/http.go b/pkg/http/http.go
index d7ca549..cf00132 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -34,7 +34,8 @@ type Request interface {
 	ID() uint32
 	// ConfToken returns the token represents the configuration of current route.
 	// Each route have its unique token, so we can use it to distinguish different
-	// route in the same plugin.
+	// route in the same plugin. When the configuration of a route changed, the token
+	// will change too.
 	ConfToken() uint32
 
 	// SrcIP returns the client's IP