You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2023/03/29 03:10:37 UTC

[servicecomb-kie] branch master updated: [fix] fix inconsistency bug between cache layer and etcd. (#287)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new e005d80  [fix] fix inconsistency bug between cache layer and etcd. (#287)
e005d80 is described below

commit e005d80afeab09163289feac644dfd6a2f12fa41
Author: kkf1 <46...@users.noreply.github.com>
AuthorDate: Wed Mar 29 11:10:31 2023 +0800

    [fix] fix inconsistency bug between cache layer and etcd. (#287)
    
    * [fix] fix inconsistency bug between cache layer and etcd.
    
    * [fix] modify review comments.
    
    * [fix] add ut for kv_cache.go.
---
 examples/dev/kie-conf.yaml                 |   8 +-
 server/config/struct.go                    |  11 +-
 server/datasource/etcd/kv/kv_cache.go      | 152 +++++++++-------
 server/datasource/etcd/kv/kv_cache_test.go | 270 ++++++++++++++++++++++++++---
 4 files changed, 346 insertions(+), 95 deletions(-)

diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 11b9106..d4aabb1 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -18,4 +18,10 @@ db:
 #  rsaPublicKeyFile: ./examples/dev/public.key
 sync:
   # turn on the synchronization switch related operations will be written to the task in the db
-  enabled: false
\ No newline at end of file
+  enabled: false
+#cache:
+#  labels:
+#    - environment
+#    - service
+#    - app
+#    - version
diff --git a/server/config/struct.go b/server/config/struct.go
index 83e91f0..44669de 100644
--- a/server/config/struct.go
+++ b/server/config/struct.go
@@ -19,9 +19,10 @@ package config
 
 // Config is yaml file struct
 type Config struct {
-	DB   DB   `yaml:"db"`
-	RBAC RBAC `yaml:"rbac"`
-	Sync Sync `yaml:"sync"`
+	DB    DB    `yaml:"db"`
+	RBAC  RBAC  `yaml:"rbac"`
+	Sync  Sync  `yaml:"sync"`
+	Cache Cache `yaml:"cache"`
 	//config from cli
 	ConfigFile     string
 	NodeName       string
@@ -59,3 +60,7 @@ type RBAC struct {
 type Sync struct {
 	Enabled bool `yaml:"enabled"`
 }
+
+type Cache struct {
+	Labels []string `yaml:"labels"`
+}
diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go
index 5776bf7..2930a99 100644
--- a/server/datasource/etcd/kv/kv_cache.go
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -11,6 +11,7 @@ import (
 
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/pkg/stringutil"
+	"github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
 	"github.com/go-chassis/foundation/backoff"
@@ -37,21 +38,27 @@ const (
 
 type IDSet map[string]struct{}
 
-type Cache struct {
-	timeOut    time.Duration
-	client     etcdadpt.Client
-	revision   int64
-	kvIDCache  sync.Map
-	kvDocCache *goCache.Cache
+type LabelsSet map[string]struct{}
+
+type CacheSearchReq struct {
+	Domain  string
+	Project string
+	Opts    *datasource.FindOptions
+	Regex   *regexp.Regexp
 }
 
 func NewKvCache() *Cache {
 	kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
+	labelsSet := LabelsSet{}
+	for _, label := range config.Configurations.Cache.Labels {
+		labelsSet[label] = struct{}{}
+	}
 	return &Cache{
 		timeOut:    etcdWatchTimeout,
 		client:     etcdadpt.Instance(),
 		revision:   0,
 		kvDocCache: kvDocCache,
+		labelsSet:  labelsSet,
 	}
 }
 
@@ -59,11 +66,13 @@ func Enabled() bool {
 	return kvCache != nil
 }
 
-type CacheSearchReq struct {
-	Domain  string
-	Project string
-	Opts    *datasource.FindOptions
-	Regex   *regexp.Regexp
+type Cache struct {
+	timeOut    time.Duration
+	client     etcdadpt.Client
+	revision   int64
+	kvIDCache  sync.Map
+	kvDocCache *goCache.Cache
+	labelsSet  LabelsSet
 }
 
 func (kc *Cache) Refresh(ctx context.Context) {
@@ -130,7 +139,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
 	return rsp, nil
 }
 
-func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
+func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
 	if rsp == nil || len(rsp.Kvs) == 0 {
 		return fmt.Errorf("unknown event")
 	}
@@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
 			openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
 			continue
 		}
+		if !kc.isInLabelsSet(kvDoc.Labels) {
+			continue
+		}
 		kc.StoreKvDoc(kvDoc.ID, kvDoc)
 		cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
 		m, ok := kc.LoadKvIDSet(cacheKey)
@@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
 	kc.kvDocCache.Delete(kvID)
 }
 
-func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
-	if !req.Opts.ExactLabels {
-		return nil, false, nil
-	}
-
-	openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
-	result := &model.KVResponse{
-		Data: []*model.KVDoc{},
-	}
-	cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
-	kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
-	if !ok {
-		kvCache.StoreKvIDSet(cacheKey, IDSet{})
-		return result, true, nil
-	}
-
-	var docs []*model.KVDoc
-
-	var kvIdsLeft []string
-	for kvID := range kvIds {
-		if doc, ok := kvCache.LoadKvDoc(kvID); ok {
-			docs = append(docs, doc)
-			continue
-		}
-		kvIdsLeft = append(kvIdsLeft, kvID)
-	}
-
-	tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
-	docs = append(docs, tpData...)
-
-	for _, doc := range docs {
-		if isMatch(req, doc) {
-			datasource.ClearPart(doc)
-			result.Data = append(result.Data, doc)
-		}
-	}
-	result.Total = len(result.Data)
-	return result, true, nil
-}
-
 func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
 	if len(kvIdsLeft) == 0 {
 		return nil
@@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
 	return docs
 }
 
-func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
-	if doc == nil {
-		return false
-	}
-	if req.Opts.Status != "" && doc.Status != req.Opts.Status {
-		return false
-	}
-	if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
-		return false
-	}
-	return true
-}
-
 func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
 	kvDoc := &model.KVDoc{}
 	err := json.Unmarshal(kv.Value, kvDoc)
@@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s
 	}, "/")
 	return inputKey
 }
+
+func (kc *Cache) isInLabelsSet(Labels map[string]string) bool {
+	for label := range Labels {
+		if _, ok := kc.labelsSet[label]; !ok {
+			return false
+		}
+	}
+	return true
+}
+
+func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
+	result := &model.KVResponse{
+		Data: []*model.KVDoc{},
+	}
+	if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) {
+		return result, false, nil
+	}
+
+	openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
+	cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
+
+	kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
+	if !ok {
+		kvCache.StoreKvIDSet(cacheKey, IDSet{})
+		return result, true, nil
+	}
+
+	var docs []*model.KVDoc
+
+	var kvIdsLeft []string
+	for kvID := range kvIds {
+		if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+			docs = append(docs, doc)
+			continue
+		}
+		kvIdsLeft = append(kvIdsLeft, kvID)
+	}
+
+	tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+	docs = append(docs, tpData...)
+
+	for _, doc := range docs {
+		if isMatch(req, doc) {
+			datasource.ClearPart(doc)
+			result.Data = append(result.Data, doc)
+		}
+	}
+	result.Total = len(result.Data)
+	return result, true, nil
+}
+
+func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
+	if doc == nil {
+		return false
+	}
+	if req.Opts.Status != "" && doc.Status != req.Opts.Status {
+		return false
+	}
+	if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
+		return false
+	}
+	return true
+}
diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go
index 2286699..c128b5a 100644
--- a/server/datasource/etcd/kv/kv_cache_test.go
+++ b/server/datasource/etcd/kv/kv_cache_test.go
@@ -1,46 +1,67 @@
 package kv
 
 import (
+	"fmt"
+	"reflect"
 	"testing"
+	"time"
 
+	"github.com/apache/servicecomb-kie/pkg/model"
+	"github.com/apache/servicecomb-kie/server/config"
 	"github.com/little-cui/etcdadpt"
+	goCache "github.com/patrickmn/go-cache"
 	"github.com/stretchr/testify/assert"
 	"go.etcd.io/etcd/api/v3/mvccpb"
 )
 
-type args struct {
-	rsp *etcdadpt.Response
+func init() {
+	config.Configurations.Cache.Labels = []string{"environment"}
 }
 
 func TestCachePut(t *testing.T) {
+	type args struct {
+		rsp *etcdadpt.Response
+	}
 	tests := []struct {
 		name string
 		args args
 		want int
 	}{
-		{"put 0 kvDoc, cache should store 0 kvDoc",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
-			0,
+		{
+			name: "put 0 kvDoc, cache should store 0 kvDoc",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
+			want: 0,
 		},
-		{"put 1 kvDoc, cache should store 1 kvDoc",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "put 1 kvDoc, cache should store 1 kvDoc",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 			}}},
-			1,
+			want: 1,
 		},
-		{"put 2 kvDocs with different kvIds, cache should store 2 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "put 2 kvDocs with different kvIds, cache should store 2 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 				{Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
 			}}},
-			2,
+			want: 2,
 		},
-		{"put 2 kvDocs with same kvId, cache should store 1 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "put 2 kvDocs with same kvId, cache should store 1 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 				{Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
 			}}},
-			1,
+			want: 1,
+		},
+		{
+			name: "put 2 kvDoc, but labels are not cached, cache should store 0 kvDoc",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)},
+				{Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"env":"testing"}}`)},
+			}}},
+			want: 0,
 		},
 	}
 	for _, tt := range tests {
@@ -54,33 +75,40 @@ func TestCachePut(t *testing.T) {
 }
 
 func TestCacheDelete(t *testing.T) {
+	type args struct {
+		rsp *etcdadpt.Response
+	}
 	tests := []struct {
 		name string
 		args args
 		want int
 	}{
-		{"first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}},
-			2,
+		{
+			name: "first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}},
+			want: 2,
 		},
-		{"first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 			}}},
-			1,
+			want: 1,
 		},
-		{"first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 				{Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)},
 			}}},
-			0,
+			want: 0,
 		},
-		{"first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs",
-			args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
+		{
+			name: "first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs",
+			args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{
 				{Value: []byte(`{"id":"0", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)},
 			}}},
-			2,
+			want: 2,
 		},
 	}
 	for _, tt := range tests {
@@ -96,3 +124,193 @@ func TestCacheDelete(t *testing.T) {
 		})
 	}
 }
+
+func TestWatchCallBack(t *testing.T) {
+	type args struct {
+		rsp []*etcdadpt.Response
+	}
+	type want struct {
+		kvNum int
+		err   error
+	}
+	tests := []struct {
+		name string
+		args args
+		want want
+	}{
+		{
+			name: "receive 2 messages without kvs, expected: error is not nil, cache should store 0 kvDoc",
+			args: args{
+				rsp: []*etcdadpt.Response{
+					{
+						Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{},
+					},
+
+					{
+						Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{},
+					},
+				},
+			},
+			want: want{
+				kvNum: 0,
+				err:   fmt.Errorf("unknown event"),
+			},
+		},
+		{
+			name: "receive 1 put message, put 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc",
+			args: args{
+				rsp: []*etcdadpt.Response{
+					{
+						Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{},
+					},
+				},
+			},
+			want: want{
+				kvNum: 0,
+				err:   fmt.Errorf("unknown event"),
+			},
+		},
+		{
+			name: "receive 1 delete message, delete 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc",
+			args: args{
+				rsp: []*etcdadpt.Response{{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}},
+			},
+			want: want{
+				kvNum: 0,
+				err:   fmt.Errorf("unknown event"),
+			},
+		},
+		{
+			name: "receive put message, put 1 kvDocs, expected: error is nil, cache should store 1 kvDocs",
+			args: args{
+				rsp: []*etcdadpt.Response{
+					{
+						Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}},
+				},
+			},
+			want: want{
+				kvNum: 1,
+				err:   nil,
+			},
+		},
+		{
+			name: "receive 1 put message, 1 delete message, first put 1 kvDoc, then delete it, expected: error is nil, cache should store 0 kvDoc",
+			args: args{
+				rsp: []*etcdadpt.Response{
+					{
+						Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}},
+					},
+					{
+						Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}},
+					},
+				},
+			},
+			want: want{
+				kvNum: 0,
+				err:   nil,
+			},
+		},
+		{
+			name: "receive put message put 1 kvDoc, but labels are not cached, cache should store 0 kvDoc",
+			args: args{
+				[]*etcdadpt.Response{
+					{
+						Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}},
+					},
+				},
+			},
+			want: want{
+				kvNum: 0,
+				err:   nil,
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			kc := NewKvCache()
+			for _, rsp := range tt.args.rsp {
+				err := kc.watchCallBack("", rsp)
+				assert.Equal(t, tt.want.err, err)
+			}
+			num := kc.kvDocCache.ItemCount()
+			assert.Equal(t, tt.want.kvNum, num)
+		})
+	}
+}
+
+func TestStoreAndLoadKvDoc(t *testing.T) {
+	type want struct {
+		kvDoc *model.KVDoc
+		exist bool
+	}
+	type args struct {
+		kvID               string
+		kvDoc              *model.KVDoc
+		expireTime         time.Duration
+		waitTimeAfterStore time.Duration
+	}
+	tests := []struct {
+		name string
+		args args
+		want want
+	}{
+		{
+			name: "store 1 kv and the expire time is 1 seconds, then load the kv with no wait time, expect: load kv successfully",
+			args: args{
+				kvID: "",
+				kvDoc: &model.KVDoc{
+					ID:    "1",
+					Key:   "withFood",
+					Value: "yes",
+					Labels: map[string]string{
+						"environment": "testing",
+					},
+				},
+				expireTime:         1 * time.Second,
+				waitTimeAfterStore: 0,
+			},
+			want: want{
+				kvDoc: &model.KVDoc{
+					ID:    "1",
+					Key:   "withFood",
+					Value: "yes",
+					Labels: map[string]string{
+						"environment": "testing",
+					},
+				},
+				exist: true,
+			},
+		},
+		{
+			name: "store 1 kv and the expire time is 1 seconds, after waiting 2 seconds, then load the kv, expect: unable to load the kv",
+			args: args{
+				kvID: "",
+				kvDoc: &model.KVDoc{
+					ID:    "1",
+					Key:   "withFood",
+					Value: "yes",
+					Labels: map[string]string{
+						"environment": "testing",
+					},
+				},
+				expireTime:         1 * time.Second,
+				waitTimeAfterStore: 2 * time.Second,
+			},
+			want: want{
+				kvDoc: nil,
+				exist: false,
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			kc := NewKvCache()
+			kc.kvDocCache = goCache.New(tt.args.expireTime, tt.args.expireTime)
+			kc.StoreKvDoc(tt.args.kvID, tt.args.kvDoc)
+			time.Sleep(tt.args.waitTimeAfterStore)
+			doc, exist := kc.LoadKvDoc(tt.args.kvID)
+			assert.Equal(t, tt.want.exist, exist)
+			reflect.DeepEqual(tt.want.kvDoc, doc)
+		})
+	}
+}