You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by so...@apache.org on 2022/12/27 05:55:14 UTC

[apisix-go-plugin-runner] branch master updated: fix(#114): fix transfering large body failed (#124)

This is an automated email from the ASF dual-hosted git repository.

soulbird pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new b941b73  fix(#114): fix transfering large body failed (#124)
b941b73 is described below

commit b941b73439d0aa98fc2d99c94a195034e616a562
Author: dongjunduo <an...@gmail.com>
AuthorDate: Tue Dec 27 13:55:09 2022 +0800

    fix(#114): fix transfering large body failed (#124)
    
    * fix(#114): fix transfering large body failed
---
 internal/http/request.go              |  8 ++---
 internal/http/request_test.go         | 22 ++++++------
 internal/http/response.go             |  8 ++---
 internal/http/response_test.go        | 22 ++++++------
 internal/server/server.go             |  8 ++---
 internal/server/server_test.go        |  2 +-
 internal/util/msg.go                  | 25 +++++++++++++
 internal/util/{msg.go => msg_test.go} | 67 +++++++++++++++--------------------
 8 files changed, 88 insertions(+), 74 deletions(-)

diff --git a/internal/http/request.go b/internal/http/request.go
index 561e1b7..5c00172 100644
--- a/internal/http/request.go
+++ b/internal/http/request.go
@@ -366,19 +366,19 @@ func (r *Request) askExtraInfo(builder *flatbuffers.Builder,
 	binary.BigEndian.PutUint32(header, uint32(size))
 	header[0] = util.RPCExtraInfo
 
-	n, err := c.Write(header)
+	n, err := util.WriteBytes(c, header, len(header))
 	if err != nil {
 		util.WriteErr(n, err)
 		return nil, common.ErrConnClosed
 	}
 
-	n, err = c.Write(out)
+	n, err = util.WriteBytes(c, out, size)
 	if err != nil {
 		util.WriteErr(n, err)
 		return nil, common.ErrConnClosed
 	}
 
-	n, err = c.Read(header)
+	n, err = util.ReadBytes(c, header, util.HeaderLen)
 	if util.ReadErr(n, err, util.HeaderLen) {
 		return nil, common.ErrConnClosed
 	}
@@ -390,7 +390,7 @@ func (r *Request) askExtraInfo(builder *flatbuffers.Builder,
 	log.Infof("receive rpc type: %d data length: %d", ty, length)
 
 	buf := make([]byte, length)
-	n, err = c.Read(buf)
+	n, err = util.ReadBytes(c, buf, int(length))
 	if util.ReadErr(n, err, int(length)) {
 		return nil, common.ErrConnClosed
 	}
diff --git a/internal/http/request_test.go b/internal/http/request_test.go
index 51fe954..d6b58ed 100644
--- a/internal/http/request_test.go
+++ b/internal/http/request_test.go
@@ -305,7 +305,7 @@ func TestVar(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -316,7 +316,7 @@ func TestVar(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -336,13 +336,13 @@ func TestVar(t *testing.T) {
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = util.RPCExtraInfo
 
-		n, err = sc.Write(header)
+		n, err = util.WriteBytes(sc, header, len(header))
 		if err != nil {
 			util.WriteErr(n, err)
 			return
 		}
 
-		n, err = sc.Write(out)
+		n, err = util.WriteBytes(sc, out, size)
 		if err != nil {
 			util.WriteErr(n, err)
 			return
@@ -365,7 +365,7 @@ func TestVar_FailedToSendExtraInfoReq(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -385,7 +385,7 @@ func TestVar_FailedToReadExtraInfoResp(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -396,7 +396,7 @@ func TestVar_FailedToReadExtraInfoResp(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -458,7 +458,7 @@ func TestBody(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -469,7 +469,7 @@ func TestBody(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -488,13 +488,13 @@ func TestBody(t *testing.T) {
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = util.RPCExtraInfo
 
-		n, err = sc.Write(header)
+		n, err = util.WriteBytes(sc, header, len(header))
 		if err != nil {
 			util.WriteErr(n, err)
 			return
 		}
 
-		n, err = sc.Write(out)
+		n, err = util.WriteBytes(sc, out, size)
 		if err != nil {
 			util.WriteErr(n, err)
 			return
diff --git a/internal/http/response.go b/internal/http/response.go
index 7c97f42..7d7870e 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -73,19 +73,19 @@ func (r *Response) askExtraInfo(builder *flatbuffers.Builder,
 	binary.BigEndian.PutUint32(header, uint32(size))
 	header[0] = util.RPCExtraInfo
 
-	n, err := c.Write(header)
+	n, err := util.WriteBytes(c, header, len(header))
 	if err != nil {
 		util.WriteErr(n, err)
 		return nil, common.ErrConnClosed
 	}
 
-	n, err = c.Write(out)
+	n, err = util.WriteBytes(c, out, size)
 	if err != nil {
 		util.WriteErr(n, err)
 		return nil, common.ErrConnClosed
 	}
 
-	n, err = c.Read(header)
+	n, err = util.ReadBytes(c, header, util.HeaderLen)
 	if util.ReadErr(n, err, util.HeaderLen) {
 		return nil, common.ErrConnClosed
 	}
@@ -97,7 +97,7 @@ func (r *Response) askExtraInfo(builder *flatbuffers.Builder,
 	log.Infof("receive rpc type: %d data length: %d", ty, length)
 
 	buf := make([]byte, length)
-	n, err = c.Read(buf)
+	n, err = util.ReadBytes(c, buf, int(length))
 	if util.ReadErr(n, err, int(length)) {
 		return nil, common.ErrConnClosed
 	}
diff --git a/internal/http/response_test.go b/internal/http/response_test.go
index 128bf73..4062dd6 100644
--- a/internal/http/response_test.go
+++ b/internal/http/response_test.go
@@ -202,7 +202,7 @@ func TestResponse_Var(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -213,7 +213,7 @@ func TestResponse_Var(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -233,13 +233,13 @@ func TestResponse_Var(t *testing.T) {
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = util.RPCExtraInfo
 
-		n, err = sc.Write(header)
+		n, err = util.WriteBytes(sc, header, len(header))
 		if err != nil {
 			util.WriteErr(n, err)
 			return
 		}
 
-		n, err = sc.Write(out)
+		n, err = util.WriteBytes(sc, out, size)
 		if err != nil {
 			util.WriteErr(n, err)
 			return
@@ -262,7 +262,7 @@ func TestResponse_Var_FailedToSendExtraInfoReq(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -282,7 +282,7 @@ func TestResponse_FailedToReadExtraInfoResp(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -293,7 +293,7 @@ func TestResponse_FailedToReadExtraInfoResp(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -314,7 +314,7 @@ func TestRead(t *testing.T) {
 
 	go func() {
 		header := make([]byte, util.HeaderLen)
-		n, err := sc.Read(header)
+		n, err := util.ReadBytes(sc, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			return
 		}
@@ -325,7 +325,7 @@ func TestRead(t *testing.T) {
 		length := binary.BigEndian.Uint32(header)
 
 		buf := make([]byte, length)
-		n, err = sc.Read(buf)
+		n, err = util.ReadBytes(sc, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			return
 		}
@@ -344,13 +344,13 @@ func TestRead(t *testing.T) {
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = util.RPCExtraInfo
 
-		n, err = sc.Write(header)
+		n, err = util.WriteBytes(sc, header, len(header))
 		if err != nil {
 			util.WriteErr(n, err)
 			return
 		}
 
-		n, err = sc.Write(out)
+		n, err = util.WriteBytes(sc, out, size)
 		if err != nil {
 			util.WriteErr(n, err)
 			return
diff --git a/internal/server/server.go b/internal/server/server.go
index 5ac291f..e8a0d8c 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -117,7 +117,7 @@ func handleConn(c net.Conn) {
 
 	header := make([]byte, util.HeaderLen)
 	for {
-		n, err := c.Read(header)
+		n, err := util.ReadBytes(c, header, util.HeaderLen)
 		if util.ReadErr(n, err, util.HeaderLen) {
 			break
 		}
@@ -131,7 +131,7 @@ func handleConn(c net.Conn) {
 		log.Infof("receive rpc type: %d data length: %d", ty, length)
 
 		buf := make([]byte, length)
-		n, err = c.Read(buf)
+		n, err = util.ReadBytes(c, buf, int(length))
 		if util.ReadErr(n, err, int(length)) {
 			break
 		}
@@ -142,13 +142,13 @@ func handleConn(c net.Conn) {
 		binary.BigEndian.PutUint32(header, uint32(size))
 		header[0] = respTy
 
-		n, err = c.Write(header)
+		n, err = util.WriteBytes(c, header, len(header))
 		if err != nil {
 			util.WriteErr(n, err)
 			break
 		}
 
-		n, err = c.Write(out)
+		n, err = util.WriteBytes(c, out, size)
 		if err != nil {
 			util.WriteErr(n, err)
 			break
diff --git a/internal/server/server_test.go b/internal/server/server_test.go
index b854973..a976b18 100644
--- a/internal/server/server_test.go
+++ b/internal/server/server_test.go
@@ -121,7 +121,7 @@ func TestRun(t *testing.T) {
 		conn, err := net.DialTimeout("unix", addr[len("unix:"):], 1*time.Second)
 		assert.NotNil(t, conn, err)
 		defer conn.Close()
-		conn.Write(c.header)
+		util.WriteBytes(conn, c.header, len(c.header))
 	}
 
 	syscall.Kill(syscall.Getpid(), syscall.SIGINT)
diff --git a/internal/util/msg.go b/internal/util/msg.go
index f95e2d9..78121ff 100644
--- a/internal/util/msg.go
+++ b/internal/util/msg.go
@@ -20,6 +20,7 @@ package util
 import (
 	"fmt"
 	"io"
+	"net"
 
 	flatbuffers "github.com/google/flatbuffers/go"
 
@@ -65,3 +66,27 @@ func WriteErr(n int, err error) {
 		log.Errorf("write: %s", err)
 	}
 }
+
+func ReadBytes(c net.Conn, b []byte, n int) (int, error) {
+	l := 0
+	for l < n {
+		tmp, err := c.Read(b[l:])
+		if err != nil {
+			return l + tmp, err
+		}
+		l += tmp
+	}
+	return l, nil
+}
+
+func WriteBytes(c net.Conn, b []byte, n int) (int, error) {
+	l := 0
+	for l < n {
+		tmp, err := c.Write(b[l:])
+		if err != nil {
+			return l + tmp, err
+		}
+		l += tmp
+	}
+	return l, nil
+}
diff --git a/internal/util/msg.go b/internal/util/msg_test.go
similarity index 53%
copy from internal/util/msg.go
copy to internal/util/msg_test.go
index f95e2d9..24a526e 100644
--- a/internal/util/msg.go
+++ b/internal/util/msg_test.go
@@ -18,50 +18,39 @@
 package util
 
 import (
-	"fmt"
-	"io"
+	"math/rand"
+	"net"
+	"testing"
+	"time"
 
-	flatbuffers "github.com/google/flatbuffers/go"
-
-	"github.com/apache/apisix-go-plugin-runner/pkg/log"
+	"github.com/stretchr/testify/assert"
 )
 
-const (
-	HeaderLen   = 4
-	MaxDataSize = 2<<24 - 1
-)
+func TestReadAndWriteBytes(t *testing.T) {
+	path := "/tmp/test.sock"
+	server, err := net.Listen("unix", path)
+	assert.NoError(t, err)
+	defer server.Close()
 
-const (
-	RPCError = iota
-	RPCPrepareConf
-	RPCHTTPReqCall
-	RPCExtraInfo
-	RPCHTTPRespCall
-)
-
-type RPCResult struct {
-	Err     error
-	Builder *flatbuffers.Builder
-}
+	// transfer large enough data
+	n := 10000000
 
-// Use struct if the result is not only []byte
-type ExtraInfoResult []byte
-
-func ReadErr(n int, err error, required int) bool {
-	if 0 < n && n < required {
-		err = fmt.Errorf("truncated, only get the first %d bytes", n)
-	}
-	if err != nil {
-		if err != io.EOF {
-			log.Errorf("read: %s", err)
-		}
-		return true
+	const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+	in := make([]byte, n)
+	for i := range in {
+		in[i] = letterBytes[rand.Intn(len(letterBytes))]
 	}
-	return false
-}
 
-func WriteErr(n int, err error) {
-	if err != nil {
-		log.Errorf("write: %s", err)
-	}
+	go func() {
+		client, err := net.DialTimeout("unix", path, 1*time.Second)
+		assert.NoError(t, err)
+		defer client.Close()
+		WriteBytes(client, in, len(in))
+	}()
+
+	fd, err := server.Accept()
+	assert.NoError(t, err)
+	out := make([]byte, n)
+	ReadBytes(fd, out, n)
+	assert.Equal(t, in, out)
 }