You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/31 02:07:16 UTC
[apisix-go-plugin-runner] 09/22: fix: concurrent issues
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git
commit e3d2617b44efc9268ac19a0498aea82c03ed567b
Author: spacewander <sp...@gmail.com>
AuthorDate: Thu May 20 17:01:05 2021 +0800
fix: concurrent issues
---
internal/plugin/conf.go | 19 ++++-------
internal/plugin/conf_test.go | 54 ++++++++++++++++++++++++++----
internal/server/error.go | 11 +++---
internal/server/error_test.go | 6 ++--
internal/server/server.go | 14 ++++++--
internal/{server/error.go => util/pool.go} | 36 ++++++++------------
6 files changed, 87 insertions(+), 53 deletions(-)
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index 6c79cb6..8e18ecc 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -16,9 +16,11 @@ package plugin
import (
"strconv"
+ "sync/atomic"
"time"
"github.com/ReneKroon/ttlcache/v2"
+ "github.com/apache/apisix-go-plugin-runner/internal/util"
A6 "github.com/api7/ext-plugin-proto/go/A6"
pc "github.com/api7/ext-plugin-proto/go/A6/PrepareConf"
flatbuffers "github.com/google/flatbuffers/go"
@@ -31,8 +33,6 @@ type ConfEntry struct {
type RuleConf []ConfEntry
var (
- builder = flatbuffers.NewBuilder(1024)
-
cache *ttlcache.Cache
cacheCounter uint32 = 0
)
@@ -41,20 +41,14 @@ func InitConfCache(ttl time.Duration) {
cache = ttlcache.NewCache()
cache.SetTTL(ttl)
cache.SkipTTLExtensionOnHit(false)
+ cacheCounter = 0
}
func genCacheToken() uint32 {
- cacheCounter++
- if cacheCounter == 0 {
- // overflow, skip 0 which means none
- cacheCounter++
- }
- return cacheCounter
+ return atomic.AddUint32(&cacheCounter, 1)
}
-func PrepareConf(buf []byte) ([]byte, error) {
- builder.Reset()
-
+func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
req := pc.GetRootAsReq(buf, 0)
entries := make(RuleConf, req.ConfLength())
@@ -72,11 +66,12 @@ func PrepareConf(buf []byte) ([]byte, error) {
return nil, err
}
+ builder := util.GetBuilder()
pc.RespStart(builder)
pc.RespAddConfToken(builder, token)
root := pc.RespEnd(builder)
builder.Finish(root)
- return builder.FinishedBytes(), nil
+ return builder, nil
}
func GetRuleConf(token uint32) (RuleConf, error) {
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index b4135a5..aa2ded4 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -15,6 +15,8 @@
package plugin
import (
+ "sort"
+ "sync"
"testing"
"time"
@@ -34,15 +36,52 @@ func TestPrepareConf(t *testing.T) {
builder.Finish(root)
b := builder.FinishedBytes()
- out, _ := PrepareConf(b)
+ bd, _ := PrepareConf(b)
+ out := bd.FinishedBytes()
resp := pc.GetRootAsResp(out, 0)
assert.Equal(t, uint32(1), resp.ConfToken())
- out, _ = PrepareConf(b)
+ bd, _ = PrepareConf(b)
+ out = bd.FinishedBytes()
resp = pc.GetRootAsResp(out, 0)
assert.Equal(t, uint32(2), resp.ConfToken())
}
+func TestPrepareConfConcurrently(t *testing.T) {
+ InitConfCache(10 * time.Millisecond)
+
+ builder := flatbuffers.NewBuilder(1024)
+ pc.ReqStart(builder)
+ root := pc.ReqEnd(builder)
+ builder.Finish(root)
+ b := builder.FinishedBytes()
+
+ n := 10
+ var wg sync.WaitGroup
+ res := make([][]byte, n)
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(i int) {
+ bd, err := PrepareConf(b)
+ assert.Nil(t, err)
+ res[i] = bd.FinishedBytes()[:]
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+
+ tokens := make([]int, n)
+ for i := 0; i < n; i++ {
+ resp := pc.GetRootAsResp(res[i], 0)
+ tokens[i] = int(resp.ConfToken())
+ }
+
+ sort.Ints(tokens)
+ for i := 0; i < n; i++ {
+ assert.Equal(t, i+1, tokens[i])
+ }
+}
+
func TestGetRuleConf(t *testing.T) {
InitConfCache(1 * time.Millisecond)
builder := flatbuffers.NewBuilder(1024)
@@ -51,15 +90,16 @@ func TestGetRuleConf(t *testing.T) {
builder.Finish(root)
b := builder.FinishedBytes()
- out, _ := PrepareConf(b)
+ bd, _ := PrepareConf(b)
+ out := bd.FinishedBytes()
resp := pc.GetRootAsResp(out, 0)
- assert.Equal(t, uint32(3), resp.ConfToken())
+ assert.Equal(t, uint32(1), resp.ConfToken())
- res, _ := GetRuleConf(3)
+ res, _ := GetRuleConf(1)
assert.Equal(t, 0, len(res))
time.Sleep(2 * time.Millisecond)
- _, err := GetRuleConf(3)
+ _, err := GetRuleConf(1)
assert.Equal(t, ttlcache.ErrNotFound, err)
}
@@ -85,7 +125,7 @@ func TestGetRuleConfCheckConf(t *testing.T) {
b := builder.FinishedBytes()
PrepareConf(b)
- res, _ := GetRuleConf(4)
+ res, _ := GetRuleConf(1)
assert.Equal(t, 1, len(res))
assert.Equal(t, "echo", res[0].Name)
}
diff --git a/internal/server/error.go b/internal/server/error.go
index 8ba517c..81d77c3 100644
--- a/internal/server/error.go
+++ b/internal/server/error.go
@@ -18,15 +18,12 @@ import (
"github.com/ReneKroon/ttlcache/v2"
A6Err "github.com/api7/ext-plugin-proto/go/A6/Err"
flatbuffers "github.com/google/flatbuffers/go"
-)
-var (
- builder = flatbuffers.NewBuilder(256)
+ "github.com/apache/apisix-go-plugin-runner/internal/util"
)
-func ReportError(err error) []byte {
- builder.Reset()
-
+func ReportError(err error) *flatbuffers.Builder {
+ builder := util.GetBuilder()
A6Err.RespStart(builder)
var code A6Err.Code
@@ -40,5 +37,5 @@ func ReportError(err error) []byte {
A6Err.RespAddCode(builder, code)
resp := A6Err.RespEnd(builder)
builder.Finish(resp)
- return builder.FinishedBytes()
+ return builder
}
diff --git a/internal/server/error_test.go b/internal/server/error_test.go
index 4d8bb21..fb5b426 100644
--- a/internal/server/error_test.go
+++ b/internal/server/error_test.go
@@ -29,12 +29,14 @@ func TestReportErrorCacheToken(t *testing.T) {
_, err := plugin.GetRuleConf(uint32(999999))
b := ReportError(err)
- resp := A6Err.GetRootAsResp(b, 0)
+ out := b.FinishedBytes()
+ resp := A6Err.GetRootAsResp(out, 0)
assert.Equal(t, A6Err.CodeCONF_TOKEN_NOT_FOUND, resp.Code())
}
func TestReportErrorUnknownErr(t *testing.T) {
b := ReportError(io.EOF)
- resp := A6Err.GetRootAsResp(b, 0)
+ out := b.FinishedBytes()
+ resp := A6Err.GetRootAsResp(out, 0)
assert.Equal(t, A6Err.CodeSERVICE_UNAVAILABLE, resp.Code())
}
diff --git a/internal/server/server.go b/internal/server/server.go
index c2deef9..52c5624 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -28,6 +28,8 @@ import (
"github.com/apache/apisix-go-plugin-runner/internal/log"
"github.com/apache/apisix-go-plugin-runner/internal/plugin"
+ "github.com/apache/apisix-go-plugin-runner/internal/util"
+ flatbuffers "github.com/google/flatbuffers/go"
)
const (
@@ -92,14 +94,15 @@ func handleConn(c net.Conn) {
break
}
- var out []byte
+ var bd *flatbuffers.Builder
switch ty {
case RPCPrepareConf:
- out, err = plugin.PrepareConf(buf)
+ bd, err = plugin.PrepareConf(buf)
default:
err = fmt.Errorf("unknown type %d", ty)
}
+ out := bd.FinishedBytes()
size := len(out)
if size > MaxDataSize {
err = fmt.Errorf("the max length of data is %d but got %d", MaxDataSize, size)
@@ -108,7 +111,9 @@ func handleConn(c net.Conn) {
if err != nil {
ty = RPCError
- out = ReportError(err)
+ util.PutBuilder(bd)
+ bd = ReportError(err)
+ out = bd.FinishedBytes()
}
binary.BigEndian.PutUint32(header, uint32(size))
@@ -117,14 +122,17 @@ func handleConn(c net.Conn) {
n, err = c.Write(header)
if err != nil {
writeErr(n, err)
+ util.PutBuilder(bd)
break
}
n, err = c.Write(out)
if err != nil {
writeErr(n, err)
+ util.PutBuilder(bd)
break
}
+ util.PutBuilder(bd)
}
}
diff --git a/internal/server/error.go b/internal/util/pool.go
similarity index 62%
copy from internal/server/error.go
copy to internal/util/pool.go
index 8ba517c..bf6a746 100644
--- a/internal/server/error.go
+++ b/internal/util/pool.go
@@ -12,33 +12,25 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package server
+package util
import (
- "github.com/ReneKroon/ttlcache/v2"
- A6Err "github.com/api7/ext-plugin-proto/go/A6/Err"
- flatbuffers "github.com/google/flatbuffers/go"
-)
+ "sync"
-var (
- builder = flatbuffers.NewBuilder(256)
+ flatbuffers "github.com/google/flatbuffers/go"
)
-func ReportError(err error) []byte {
- builder.Reset()
-
- A6Err.RespStart(builder)
+var builderPool = sync.Pool{
+ New: func() interface{} {
+ return flatbuffers.NewBuilder(256)
+ },
+}
- var code A6Err.Code
- switch err {
- case ttlcache.ErrNotFound:
- code = A6Err.CodeCONF_TOKEN_NOT_FOUND
- default:
- code = A6Err.CodeSERVICE_UNAVAILABLE
- }
+func GetBuilder() *flatbuffers.Builder {
+ return builderPool.Get().(*flatbuffers.Builder)
+}
- A6Err.RespAddCode(builder, code)
- resp := A6Err.RespEnd(builder)
- builder.Finish(resp)
- return builder.FinishedBytes()
+func PutBuilder(b *flatbuffers.Builder) {
+ b.Reset()
+ builderPool.Put(b)
}