You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/03/29 03:10:37 UTC
[servicecomb-kie] branch master updated: [fix] fix inconsistency bug between cache layer and etcd. (#287)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new e005d80 [fix] fix inconsistency bug between cache layer and etcd. (#287)
e005d80 is described below
commit e005d80afeab09163289feac644dfd6a2f12fa41
Author: kkf1 <46...@users.noreply.github.com>
AuthorDate: Wed Mar 29 11:10:31 2023 +0800
[fix] fix inconsistency bug between cache layer and etcd. (#287)
* [fix] fix inconsistency bug between cache layer and etcd.
* [fix] modify review comments.
* [fix] add ut for kv_cache.go.
---
examples/dev/kie-conf.yaml | 8 +-
server/config/struct.go | 11 +-
server/datasource/etcd/kv/kv_cache.go | 152 +++++++++-------
server/datasource/etcd/kv/kv_cache_test.go | 270 ++++++++++++++++++++++++++---
4 files changed, 346 insertions(+), 95 deletions(-)
diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 11b9106..d4aabb1 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -18,4 +18,10 @@ db:
# rsaPublicKeyFile: ./examples/dev/public.key
sync:
# turn on the synchronization switch related operations will be written to the task in the db
- enabled: false
\ No newline at end of file
+ enabled: false
+#cache:
+# labels:
+# - environment
+# - service
+# - app
+# - version
diff --git a/server/config/struct.go b/server/config/struct.go
index 83e91f0..44669de 100644
--- a/server/config/struct.go
+++ b/server/config/struct.go
@@ -19,9 +19,10 @@ package config
// Config is yaml file struct
type Config struct {
- DB DB `yaml:"db"`
- RBAC RBAC `yaml:"rbac"`
- Sync Sync `yaml:"sync"`
+ DB DB `yaml:"db"`
+ RBAC RBAC `yaml:"rbac"`
+ Sync Sync `yaml:"sync"`
+ Cache Cache `yaml:"cache"`
//config from cli
ConfigFile string
NodeName string
@@ -59,3 +60,7 @@ type RBAC struct {
type Sync struct {
Enabled bool `yaml:"enabled"`
}
+
+type Cache struct {
+ Labels []string `yaml:"labels"`
+}
diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go
index 5776bf7..2930a99 100644
--- a/server/datasource/etcd/kv/kv_cache.go
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -11,6 +11,7 @@ import (
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
+ "github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
@@ -37,21 +38,27 @@ const (
type IDSet map[string]struct{}
-type Cache struct {
- timeOut time.Duration
- client etcdadpt.Client
- revision int64
- kvIDCache sync.Map
- kvDocCache *goCache.Cache
+type LabelsSet map[string]struct{}
+
+type CacheSearchReq struct {
+ Domain string
+ Project string
+ Opts *datasource.FindOptions
+ Regex *regexp.Regexp
}
func NewKvCache() *Cache {
kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
+ labelsSet := LabelsSet{}
+ for _, label := range config.Configurations.Cache.Labels {
+ labelsSet[label] = struct{}{}
+ }
return &Cache{
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
+ labelsSet: labelsSet,
}
}
@@ -59,11 +66,13 @@ func Enabled() bool {
return kvCache != nil
}
-type CacheSearchReq struct {
- Domain string
- Project string
- Opts *datasource.FindOptions
- Regex *regexp.Regexp
+type Cache struct {
+ timeOut time.Duration
+ client etcdadpt.Client
+ revision int64
+ kvIDCache sync.Map
+ kvDocCache *goCache.Cache
+ labelsSet LabelsSet
}
func (kc *Cache) Refresh(ctx context.Context) {
@@ -130,7 +139,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
return rsp, nil
}
-func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
+func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
if rsp == nil || len(rsp.Kvs) == 0 {
return fmt.Errorf("unknown event")
}
@@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
}
+ if !kc.isInLabelsSet(kvDoc.Labels) {
+ continue
+ }
kc.StoreKvDoc(kvDoc.ID, kvDoc)
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
@@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}
-func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
- if !req.Opts.ExactLabels {
- return nil, false, nil
- }
-
- openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
- result := &model.KVResponse{
- Data: []*model.KVDoc{},
- }
- cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
- kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
- if !ok {
- kvCache.StoreKvIDSet(cacheKey, IDSet{})
- return result, true, nil
- }
-
- var docs []*model.KVDoc
-
- var kvIdsLeft []string
- for kvID := range kvIds {
- if doc, ok := kvCache.LoadKvDoc(kvID); ok {
- docs = append(docs, doc)
- continue
- }
- kvIdsLeft = append(kvIdsLeft, kvID)
- }
-
- tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
- docs = append(docs, tpData...)
-
- for _, doc := range docs {
- if isMatch(req, doc) {
- datasource.ClearPart(doc)
- result.Data = append(result.Data, doc)
- }
- }
- result.Total = len(result.Data)
- return result, true, nil
-}
-
func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
if len(kvIdsLeft) == 0 {
return nil
@@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
return docs
}
-func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
- if doc == nil {
- return false
- }
- if req.Opts.Status != "" && doc.Status != req.Opts.Status {
- return false
- }
- if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
- return false
- }
- return true
-}
-
func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
kvDoc := &model.KVDoc{}
err := json.Unmarshal(kv.Value, kvDoc)
@@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s
}, "/")
return inputKey
}
+
+func (kc *Cache) isInLabelsSet(Labels map[string]string) bool {
+ for label := range Labels {
+ if _, ok := kc.labelsSet[label]; !ok {
+ return false
+ }
+ }
+ return true
+}
+
+func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
+ result := &model.KVResponse{
+ Data: []*model.KVDoc{},
+ }
+ if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) {
+ return result, false, nil
+ }
+
+ openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
+ cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
+
+ kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
+ if !ok {
+ kvCache.StoreKvIDSet(cacheKey, IDSet{})
+ return result, true, nil
+ }
+
+ var docs []*model.KVDoc
+
+ var kvIdsLeft []string
+ for kvID := range kvIds {
+ if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+ docs = append(docs, doc)
+ continue
+ }
+ kvIdsLeft = append(kvIdsLeft, kvID)
+ }
+
+ tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+ docs = append(docs, tpData...)
+
+ for _, doc := range docs {
+ if isMatch(req, doc) {
+ datasource.ClearPart(doc)
+ result.Data = append(result.Data, doc)
+ }
+ }
+ result.Total = len(result.Data)
+ return result, true, nil
+}
+
+func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
+ if doc == nil {
+ return false
+ }
+ if req.Opts.Status != "" && doc.Status != req.Opts.Status {
+ return false
+ }
+ if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
+ return false
+ }
+ return true
+}
diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go
index 2286699..c128b5a 100644
--- a/server/datasource/etcd/kv/kv_cache_test.go
+++ b/server/datasource/etcd/kv/kv_cache_test.go
@@ -1,46 +1,67 @@
package kv
import (
+ "fmt"
+ "reflect"
"testing"
+ "time"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/config"
"github.com/little-cui/etcdadpt"
+ goCache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/mvccpb"
)
-type args struct {
- rsp *etcdadpt.Response
+func init() {
+ config.Configurations.Cache.Labels = []string{"environment"}
}
func TestCachePut(t *testing.T) {
+ type args struct {
+ rsp *etcdadpt.Response
+ }
tests := []struct {
name string
args args
want int
}{
- {"put 0 kvDoc, cache should store 0 kvDoc",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
- 0,
+ {
+ name: "put 0 kvDoc, cache should store 0 kvDoc",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
+ want: 0,
},
- {"put 1 kvDoc, cache should store 1 kvDoc",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "put 1 kvDoc, cache should store 1 kvDoc",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
}}},
- 1,
+ want: 1,
},
- {"put 2 kvDocs with different kvIds, cache should store 2 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "put 2 kvDocs with different kvIds, cache should store 2 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
{Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
}}},
- 2,
+ want: 2,
},
- {"put 2 kvDocs with same kvId, cache should store 1 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "put 2 kvDocs with same kvId, cache should store 1 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
{Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
}}},
- 1,
+ want: 1,
+ },
+ {
+ name: "put 2 kvDoc, but labels are not cached, cache should store 0 kvDoc",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)},
+ {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"env":"testing"}}`)},
+ }}},
+ want: 0,
},
}
for _, tt := range tests {
@@ -54,33 +75,40 @@ func TestCachePut(t *testing.T) {
}
func TestCacheDelete(t *testing.T) {
+ type args struct {
+ rsp *etcdadpt.Response
+ }
tests := []struct {
name string
args args
want int
}{
- {"first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}},
- 2,
+ {
+ name: "first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}},
+ want: 2,
},
- {"first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
}}},
- 1,
+ want: 1,
},
- {"first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
{Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
}}},
- 0,
+ want: 0,
},
- {"first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs",
- args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+ {
+ name: "first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs",
+ args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
{Value: []byte(`{"id":"0", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
}}},
- 2,
+ want: 2,
},
}
for _, tt := range tests {
@@ -96,3 +124,193 @@ func TestCacheDelete(t *testing.T) {
})
}
}
+
+func TestWatchCallBack(t *testing.T) {
+ type args struct {
+ rsp []*etcdadpt.Response
+ }
+ type want struct {
+ kvNum int
+ err error
+ }
+ tests := []struct {
+ name string
+ args args
+ want want
+ }{
+ {
+ name: "receive 2 messages without kvs, expected: error is not nil, cache should store 0 kvDoc",
+ args: args{
+ rsp: []*etcdadpt.Response{
+ {
+ Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{},
+ },
+
+ {
+ Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{},
+ },
+ },
+ },
+ want: want{
+ kvNum: 0,
+ err: fmt.Errorf("unknown event"),
+ },
+ },
+ {
+ name: "receive 1 put message, put 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc",
+ args: args{
+ rsp: []*etcdadpt.Response{
+ {
+ Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{},
+ },
+ },
+ },
+ want: want{
+ kvNum: 0,
+ err: fmt.Errorf("unknown event"),
+ },
+ },
+ {
+ name: "receive 1 delete message, delete 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc",
+ args: args{
+ rsp: []*etcdadpt.Response{{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
+ },
+ want: want{
+ kvNum: 0,
+ err: fmt.Errorf("unknown event"),
+ },
+ },
+ {
+ name: "receive put message, put 1 kvDocs, expected: error is nil, cache should store 1 kvDocs",
+ args: args{
+ rsp: []*etcdadpt.Response{
+ {
+ Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}},
+ },
+ },
+ want: want{
+ kvNum: 1,
+ err: nil,
+ },
+ },
+ {
+ name: "receive 1 put message, 1 delete message, first put 1 kvDoc, then delete it, expected: error is nil, cache should store 0 kvDoc",
+ args: args{
+ rsp: []*etcdadpt.Response{
+ {
+ Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}},
+ },
+ {
+ Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}},
+ },
+ },
+ },
+ want: want{
+ kvNum: 0,
+ err: nil,
+ },
+ },
+ {
+ name: "receive put message put 1 kvDoc, but labels are not cached, cache should store 0 kvDoc",
+ args: args{
+ []*etcdadpt.Response{
+ {
+ Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}},
+ },
+ },
+ },
+ want: want{
+ kvNum: 0,
+ err: nil,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ kc := NewKvCache()
+ for _, rsp := range tt.args.rsp {
+ err := kc.watchCallBack("", rsp)
+ assert.Equal(t, tt.want.err, err)
+ }
+ num := kc.kvDocCache.ItemCount()
+ assert.Equal(t, tt.want.kvNum, num)
+ })
+ }
+}
+
+func TestStoreAndLoadKvDoc(t *testing.T) {
+ type want struct {
+ kvDoc *model.KVDoc
+ exist bool
+ }
+ type args struct {
+ kvID string
+ kvDoc *model.KVDoc
+ expireTime time.Duration
+ waitTimeAfterStore time.Duration
+ }
+ tests := []struct {
+ name string
+ args args
+ want want
+ }{
+ {
+ name: "store 1 kv and the expire time is 1 seconds, then load the kv with no wait time, expect: load kv successfully",
+ args: args{
+ kvID: "",
+ kvDoc: &model.KVDoc{
+ ID: "1",
+ Key: "withFood",
+ Value: "yes",
+ Labels: map[string]string{
+ "environment": "testing",
+ },
+ },
+ expireTime: 1 * time.Second,
+ waitTimeAfterStore: 0,
+ },
+ want: want{
+ kvDoc: &model.KVDoc{
+ ID: "1",
+ Key: "withFood",
+ Value: "yes",
+ Labels: map[string]string{
+ "environment": "testing",
+ },
+ },
+ exist: true,
+ },
+ },
+ {
+ name: "store 1 kv and the expire time is 1 seconds, after waiting 2 seconds, then load the kv, expect: unable to load the kv",
+ args: args{
+ kvID: "",
+ kvDoc: &model.KVDoc{
+ ID: "1",
+ Key: "withFood",
+ Value: "yes",
+ Labels: map[string]string{
+ "environment": "testing",
+ },
+ },
+ expireTime: 1 * time.Second,
+ waitTimeAfterStore: 2 * time.Second,
+ },
+ want: want{
+ kvDoc: nil,
+ exist: false,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ kc := NewKvCache()
+ kc.kvDocCache = goCache.New(tt.args.expireTime, tt.args.expireTime)
+ kc.StoreKvDoc(tt.args.kvID, tt.args.kvDoc)
+ time.Sleep(tt.args.waitTimeAfterStore)
+ doc, exist := kc.LoadKvDoc(tt.args.kvID)
+ assert.Equal(t, tt.want.exist, exist)
+ reflect.DeepEqual(tt.want.kvDoc, doc)
+ })
+ }
+}