You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/31 02:07:16 UTC

[apisix-go-plugin-runner] 09/22: fix: concurrent issues

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git

commit e3d2617b44efc9268ac19a0498aea82c03ed567b
Author: spacewander <sp...@gmail.com>
AuthorDate: Thu May 20 17:01:05 2021 +0800

    fix: concurrent issues
---
 internal/plugin/conf.go                    | 19 ++++-------
 internal/plugin/conf_test.go               | 54 ++++++++++++++++++++++++++----
 internal/server/error.go                   | 11 +++---
 internal/server/error_test.go              |  6 ++--
 internal/server/server.go                  | 14 ++++++--
 internal/{server/error.go => util/pool.go} | 36 ++++++++------------
 6 files changed, 87 insertions(+), 53 deletions(-)

diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index 6c79cb6..8e18ecc 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -16,9 +16,11 @@ package plugin
 
 import (
 	"strconv"
+	"sync/atomic"
 	"time"
 
 	"github.com/ReneKroon/ttlcache/v2"
+	"github.com/apache/apisix-go-plugin-runner/internal/util"
 	A6 "github.com/api7/ext-plugin-proto/go/A6"
 	pc "github.com/api7/ext-plugin-proto/go/A6/PrepareConf"
 	flatbuffers "github.com/google/flatbuffers/go"
@@ -31,8 +33,6 @@ type ConfEntry struct {
 type RuleConf []ConfEntry
 
 var (
-	builder = flatbuffers.NewBuilder(1024)
-
 	cache        *ttlcache.Cache
 	cacheCounter uint32 = 0
 )
@@ -41,20 +41,14 @@ func InitConfCache(ttl time.Duration) {
 	cache = ttlcache.NewCache()
 	cache.SetTTL(ttl)
 	cache.SkipTTLExtensionOnHit(false)
+	cacheCounter = 0
 }
 
 func genCacheToken() uint32 {
-	cacheCounter++
-	if cacheCounter == 0 {
-		// overflow, skip 0 which means none
-		cacheCounter++
-	}
-	return cacheCounter
+	return atomic.AddUint32(&cacheCounter, 1)
 }
 
-func PrepareConf(buf []byte) ([]byte, error) {
-	builder.Reset()
-
+func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
 	req := pc.GetRootAsReq(buf, 0)
 	entries := make(RuleConf, req.ConfLength())
 
@@ -72,11 +66,12 @@ func PrepareConf(buf []byte) ([]byte, error) {
 		return nil, err
 	}
 
+	builder := util.GetBuilder()
 	pc.RespStart(builder)
 	pc.RespAddConfToken(builder, token)
 	root := pc.RespEnd(builder)
 	builder.Finish(root)
-	return builder.FinishedBytes(), nil
+	return builder, nil
 }
 
 func GetRuleConf(token uint32) (RuleConf, error) {
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index b4135a5..aa2ded4 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -15,6 +15,8 @@
 package plugin
 
 import (
+	"sort"
+	"sync"
 	"testing"
 	"time"
 
@@ -34,15 +36,52 @@ func TestPrepareConf(t *testing.T) {
 	builder.Finish(root)
 	b := builder.FinishedBytes()
 
-	out, _ := PrepareConf(b)
+	bd, _ := PrepareConf(b)
+	out := bd.FinishedBytes()
 	resp := pc.GetRootAsResp(out, 0)
 	assert.Equal(t, uint32(1), resp.ConfToken())
 
-	out, _ = PrepareConf(b)
+	bd, _ = PrepareConf(b)
+	out = bd.FinishedBytes()
 	resp = pc.GetRootAsResp(out, 0)
 	assert.Equal(t, uint32(2), resp.ConfToken())
 }
 
+func TestPrepareConfConcurrently(t *testing.T) {
+	InitConfCache(10 * time.Millisecond)
+
+	builder := flatbuffers.NewBuilder(1024)
+	pc.ReqStart(builder)
+	root := pc.ReqEnd(builder)
+	builder.Finish(root)
+	b := builder.FinishedBytes()
+
+	n := 10
+	var wg sync.WaitGroup
+	res := make([][]byte, n)
+	for i := 0; i < n; i++ {
+		wg.Add(1)
+		go func(i int) {
+			bd, err := PrepareConf(b)
+			assert.Nil(t, err)
+			res[i] = bd.FinishedBytes()[:]
+			wg.Done()
+		}(i)
+	}
+	wg.Wait()
+
+	tokens := make([]int, n)
+	for i := 0; i < n; i++ {
+		resp := pc.GetRootAsResp(res[i], 0)
+		tokens[i] = int(resp.ConfToken())
+	}
+
+	sort.Ints(tokens)
+	for i := 0; i < n; i++ {
+		assert.Equal(t, i+1, tokens[i])
+	}
+}
+
 func TestGetRuleConf(t *testing.T) {
 	InitConfCache(1 * time.Millisecond)
 	builder := flatbuffers.NewBuilder(1024)
@@ -51,15 +90,16 @@ func TestGetRuleConf(t *testing.T) {
 	builder.Finish(root)
 	b := builder.FinishedBytes()
 
-	out, _ := PrepareConf(b)
+	bd, _ := PrepareConf(b)
+	out := bd.FinishedBytes()
 	resp := pc.GetRootAsResp(out, 0)
-	assert.Equal(t, uint32(3), resp.ConfToken())
+	assert.Equal(t, uint32(1), resp.ConfToken())
 
-	res, _ := GetRuleConf(3)
+	res, _ := GetRuleConf(1)
 	assert.Equal(t, 0, len(res))
 
 	time.Sleep(2 * time.Millisecond)
-	_, err := GetRuleConf(3)
+	_, err := GetRuleConf(1)
 	assert.Equal(t, ttlcache.ErrNotFound, err)
 }
 
@@ -85,7 +125,7 @@ func TestGetRuleConfCheckConf(t *testing.T) {
 	b := builder.FinishedBytes()
 
 	PrepareConf(b)
-	res, _ := GetRuleConf(4)
+	res, _ := GetRuleConf(1)
 	assert.Equal(t, 1, len(res))
 	assert.Equal(t, "echo", res[0].Name)
 }
diff --git a/internal/server/error.go b/internal/server/error.go
index 8ba517c..81d77c3 100644
--- a/internal/server/error.go
+++ b/internal/server/error.go
@@ -18,15 +18,12 @@ import (
 	"github.com/ReneKroon/ttlcache/v2"
 	A6Err "github.com/api7/ext-plugin-proto/go/A6/Err"
 	flatbuffers "github.com/google/flatbuffers/go"
-)
 
-var (
-	builder = flatbuffers.NewBuilder(256)
+	"github.com/apache/apisix-go-plugin-runner/internal/util"
 )
 
-func ReportError(err error) []byte {
-	builder.Reset()
-
+func ReportError(err error) *flatbuffers.Builder {
+	builder := util.GetBuilder()
 	A6Err.RespStart(builder)
 
 	var code A6Err.Code
@@ -40,5 +37,5 @@ func ReportError(err error) []byte {
 	A6Err.RespAddCode(builder, code)
 	resp := A6Err.RespEnd(builder)
 	builder.Finish(resp)
-	return builder.FinishedBytes()
+	return builder
 }
diff --git a/internal/server/error_test.go b/internal/server/error_test.go
index 4d8bb21..fb5b426 100644
--- a/internal/server/error_test.go
+++ b/internal/server/error_test.go
@@ -29,12 +29,14 @@ func TestReportErrorCacheToken(t *testing.T) {
 
 	_, err := plugin.GetRuleConf(uint32(999999))
 	b := ReportError(err)
-	resp := A6Err.GetRootAsResp(b, 0)
+	out := b.FinishedBytes()
+	resp := A6Err.GetRootAsResp(out, 0)
 	assert.Equal(t, A6Err.CodeCONF_TOKEN_NOT_FOUND, resp.Code())
 }
 
 func TestReportErrorUnknownErr(t *testing.T) {
 	b := ReportError(io.EOF)
-	resp := A6Err.GetRootAsResp(b, 0)
+	out := b.FinishedBytes()
+	resp := A6Err.GetRootAsResp(out, 0)
 	assert.Equal(t, A6Err.CodeSERVICE_UNAVAILABLE, resp.Code())
 }
diff --git a/internal/server/server.go b/internal/server/server.go
index c2deef9..52c5624 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -28,6 +28,8 @@ import (
 
 	"github.com/apache/apisix-go-plugin-runner/internal/log"
 	"github.com/apache/apisix-go-plugin-runner/internal/plugin"
+	"github.com/apache/apisix-go-plugin-runner/internal/util"
+	flatbuffers "github.com/google/flatbuffers/go"
 )
 
 const (
@@ -92,14 +94,15 @@ func handleConn(c net.Conn) {
 			break
 		}
 
-		var out []byte
+		var bd *flatbuffers.Builder
 		switch ty {
 		case RPCPrepareConf:
-			out, err = plugin.PrepareConf(buf)
+			bd, err = plugin.PrepareConf(buf)
 		default:
 			err = fmt.Errorf("unknown type %d", ty)
 		}
 
+		out := bd.FinishedBytes()
 		size := len(out)
 		if size > MaxDataSize {
 			err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
@@ -108,7 +111,9 @@ func handleConn(c net.Conn) {
 
 		if err != nil {
 			ty = RPCError
-			out = ReportError(err)
+			util.PutBuilder(bd)
+			bd = ReportError(err)
+			out = bd.FinishedBytes()
 		}
 
 		binary.BigEndian.PutUint32(header, uint32(size))
@@ -117,14 +122,17 @@ func handleConn(c net.Conn) {
 		n, err = c.Write(header)
 		if err != nil {
 			writeErr(n, err)
+			util.PutBuilder(bd)
 			break
 		}
 
 		n, err = c.Write(out)
 		if err != nil {
 			writeErr(n, err)
+			util.PutBuilder(bd)
 			break
 		}
+		util.PutBuilder(bd)
 	}
 }
 
diff --git a/internal/server/error.go b/internal/util/pool.go
similarity index 62%
copy from internal/server/error.go
copy to internal/util/pool.go
index 8ba517c..bf6a746 100644
--- a/internal/server/error.go
+++ b/internal/util/pool.go
@@ -12,33 +12,25 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-package server
+package util
 
 import (
-	"github.com/ReneKroon/ttlcache/v2"
-	A6Err "github.com/api7/ext-plugin-proto/go/A6/Err"
-	flatbuffers "github.com/google/flatbuffers/go"
-)
+	"sync"
 
-var (
-	builder = flatbuffers.NewBuilder(256)
+	flatbuffers "github.com/google/flatbuffers/go"
 )
 
-func ReportError(err error) []byte {
-	builder.Reset()
-
-	A6Err.RespStart(builder)
+var builderPool = sync.Pool{
+	New: func() interface{} {
+		return flatbuffers.NewBuilder(256)
+	},
+}
 
-	var code A6Err.Code
-	switch err {
-	case ttlcache.ErrNotFound:
-		code = A6Err.CodeCONF_TOKEN_NOT_FOUND
-	default:
-		code = A6Err.CodeSERVICE_UNAVAILABLE
-	}
+func GetBuilder() *flatbuffers.Builder {
+	return builderPool.Get().(*flatbuffers.Builder)
+}
 
-	A6Err.RespAddCode(builder, code)
-	resp := A6Err.RespEnd(builder)
-	builder.Finish(resp)
-	return builder.FinishedBytes()
+func PutBuilder(b *flatbuffers.Builder) {
+	b.Reset()
+	builderPool.Put(b)
 }