You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/01/10 06:08:38 UTC

[servicecomb-service-center] branch master updated: SCB-1712 Reset kv cache periodically (#618)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 21aa647  SCB-1712 Reset kv cache periodically (#618)
21aa647 is described below

commit 21aa647b138517442b8c839b68e4b57377b4622c
Author: little-cui <su...@qq.com>
AuthorDate: Fri Jan 10 14:08:28 2020 +0800

    SCB-1712 Reset kv cache periodically (#618)
    
    * SCB-1712 Reset kv cache periodically
    
    * SCB-1712 Reset kv cache periodically
---
 etc/conf/app.conf                                  |  2 +
 pkg/gopool/goroutines.go                           |  1 +
 pkg/util/util_test.go                              |  7 +++
 proxy.sh                                           | 16 +++++++
 server/core/backend/defer_instance.go              |  8 ++++
 server/core/backend/defer_test.go                  | 16 ++++++-
 server/core/backend/discovery.go                   | 26 +++++++++++
 server/core/config.go                              |  8 ++++
 server/core/proto/parser.go                        | 28 +++++++++++
 server/core/proto/parser_test.go                   | 18 +++++++-
 server/core/proto/types.go                         |  2 +
 server/plugin/pkg/discovery/cache.go               |  6 +++
 server/plugin/pkg/discovery/cache_kv.go            | 35 +++++++++-----
 server/plugin/pkg/discovery/cache_null.go          |  3 ++
 server/plugin/pkg/discovery/etcd/cacher_kv.go      | 54 ++++++++++++++++++----
 server/plugin/pkg/discovery/etcd/cacher_kv_test.go |  4 +-
 server/plugin/pkg/discovery/indexer_test.go        |  1 -
 server/service/util/microservice_util.go           |  5 +-
 18 files changed, 213 insertions(+), 27 deletions(-)

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index ff85148..41eb652 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -87,6 +87,8 @@ compact_interval = 12h
 # registry cache, if this option value set 0, service center can run
 # in lower memory but no longer push the events to client.
 enable_cache = 1
+# the cache will be clear after X, if not set cache will be never clear
+cache_ttl = ""
 
 # pluggable cipher
 cipher_plugin = ""
diff --git a/pkg/gopool/goroutines.go b/pkg/gopool/goroutines.go
index e096791..79b9237 100644
--- a/pkg/gopool/goroutines.go
+++ b/pkg/gopool/goroutines.go
@@ -75,6 +75,7 @@ func (g *Pool) execute(f func(ctx context.Context)) {
 	f(g.ctx)
 }
 
+// Do pick one idle goroutine to do the f once
 func (g *Pool) Do(f func(context.Context)) *Pool {
 	defer log.Recover()
 	select {
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index a168ddf..e0294df 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -172,3 +172,10 @@ func TestGetEnvString(t *testing.T) {
 		t.Fatalf("TestGetEnvInt failed")
 	}
 }
+
+func TestBytesToStringWithNoCopy(t *testing.T) {
+	s := BytesToStringWithNoCopy(nil)
+	if s != "" {
+		t.Fatal("TestBytesToStringWithNoCopy failed")
+	}
+}
diff --git a/proxy.sh b/proxy.sh
new file mode 100755
index 0000000..5fcc641
--- /dev/null
+++ b/proxy.sh
@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+export GOPROXY=https://goproxy.io
diff --git a/server/core/backend/defer_instance.go b/server/core/backend/defer_instance.go
index 8c7b62c..74faf2d 100644
--- a/server/core/backend/defer_instance.go
+++ b/server/core/backend/defer_instance.go
@@ -63,6 +63,10 @@ func (iedh *InstanceEventDeferHandler) OnCondition(cache discovery.CacheReader,
 }
 
 func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt discovery.KvEvent) {
+	if evt.KV == nil {
+		log.Errorf(nil, "defer or recover a %s nil KV", evt.Type)
+		return
+	}
 	kv := evt.KV
 	key := util.BytesToStringWithNoCopy(kv.Key)
 	_, ok := iedh.items[key]
@@ -79,6 +83,10 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt discovery.KvEvent) {
 		}
 
 		instance := kv.Value.(*pb.MicroServiceInstance)
+		if instance == nil {
+			log.Errorf(nil, "defer or recover a %s nil Value, KV is %v", evt.Type, kv)
+			return
+		}
 		ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
 		if ttl <= 0 || ttl > selfPreservationMaxTTL {
 			ttl = selfPreservationMaxTTL
diff --git a/server/core/backend/defer_test.go b/server/core/backend/defer_test.go
index 2b47546..deb3172 100644
--- a/server/core/backend/defer_test.go
+++ b/server/core/backend/defer_test.go
@@ -41,7 +41,6 @@ func (n *mockCache) GetAll(arr *[]*discovery.KeyValue) (i int) {
 func (n *mockCache) GetPrefix(prefix string, arr *[]*discovery.KeyValue) int        { return 0 }
 func (n *mockCache) ForEach(iter func(k string, v *discovery.KeyValue) (next bool)) {}
 func (n *mockCache) Put(k string, v *discovery.KeyValue)                            { n.c[k] = v }
-func (n *mockCache) Remove(k string)                                                { delete(n.c, k) }
 
 func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
 	iedh := &InstanceEventDeferHandler{
@@ -208,3 +207,18 @@ func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
 		break
 	}
 }
+
+func TestConvert(t *testing.T) {
+	value := discovery.NewKeyValue()
+	_, ok := value.Value.(*pb.MicroServiceInstance)
+	if ok {
+		t.Fatal("TestConvert failed")
+	}
+
+	var inst *pb.MicroServiceInstance = nil
+	value.Value = inst
+	_, ok = value.Value.(*pb.MicroServiceInstance)
+	if !ok {
+		t.Fatal("TestConvert failed")
+	}
+}
diff --git a/server/core/backend/discovery.go b/server/core/backend/discovery.go
index b58557b..170923c 100644
--- a/server/core/backend/discovery.go
+++ b/server/core/backend/discovery.go
@@ -22,11 +22,13 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/task"
 	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/core"
 	"github.com/apache/servicecomb-service-center/server/plugin"
 	"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
 	"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
 	"golang.org/x/net/context"
 	"sync"
+	"time"
 )
 
 var store = &KvStore{}
@@ -84,6 +86,7 @@ func (s *KvStore) getOrCreateAdaptor(t discovery.Type) discovery.Adaptor {
 
 func (s *KvStore) Run() {
 	s.goroutine.Do(s.store)
+	s.goroutine.Do(s.autoClearCache)
 	s.taskService.Run()
 }
 
@@ -102,6 +105,29 @@ func (s *KvStore) store(ctx context.Context) {
 	log.Debugf("all adaptors are ready")
 }
 
+func (s *KvStore) autoClearCache(ctx context.Context) {
+	if core.ServerInfo.Config.CacheTTL == 0 {
+		return
+	}
+
+	log.Infof("start auto clear cache in %v", core.ServerInfo.Config.CacheTTL)
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case <-time.After(core.ServerInfo.Config.CacheTTL):
+			for _, t := range discovery.Types {
+				cache, ok := s.getOrCreateAdaptor(t).Cache().(discovery.Cache)
+				if !ok {
+					log.Error("the discovery adaptor does not implement the Cache", nil)
+					break
+				}
+				cache.MarkDirty()
+			}
+		}
+	}
+}
+
 func (s *KvStore) closed() bool {
 	return s.isClose
 }
diff --git a/server/core/config.go b/server/core/config.go
index ae6ac84..00948fe 100644
--- a/server/core/config.go
+++ b/server/core/config.go
@@ -37,6 +37,7 @@ const (
 
 	minServiceClearInterval = 30 * time.Second
 	minServiceTTL           = 30 * time.Second
+	minCacheTTL             = 5 * time.Minute
 
 	maxServiceClearInterval = 24 * time.Hour       //1 day
 	maxServiceTTL           = 24 * 365 * time.Hour //1 year
@@ -76,6 +77,12 @@ func newInfo() pb.ServerInformation {
 		serviceTTL = defaultServiceTTL
 	}
 
+	cacheTTL, err := time.ParseDuration(
+		util.GetEnvString("CACHE_TTL", beego.AppConfig.DefaultString("cache_ttl", "")))
+	if err == nil && cacheTTL < minCacheTTL {
+		cacheTTL = minCacheTTL
+	}
+
 	return pb.ServerInformation{
 		Version: InitVersion,
 		Config: pb.ServerConfig{
@@ -113,6 +120,7 @@ func newInfo() pb.ServerInformation {
 
 			EnablePProf:  beego.AppConfig.DefaultInt("enable_pprof", 0) != 0,
 			EnableCache:  beego.AppConfig.DefaultInt("enable_cache", 1) != 0,
+			CacheTTL:     cacheTTL,
 			SelfRegister: beego.AppConfig.DefaultInt("self_register", 1) != 0,
 
 			ServiceClearEnabled:  os.Getenv("SERVICE_CLEAR_ENABLED") == "true",
diff --git a/server/core/proto/parser.go b/server/core/proto/parser.go
index c746abc..f361625 100644
--- a/server/core/proto/parser.go
+++ b/server/core/proto/parser.go
@@ -17,9 +17,15 @@ package proto
 
 import (
 	"encoding/json"
+	"errors"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
+var (
+	errParseNilPoint  = errors.New("parse nil point")
+	errTargetNilPoint = errors.New("target is nil point")
+)
+
 // new
 type CreateValueFunc func() interface{}
 
@@ -40,21 +46,33 @@ type ParseValueFunc func(src []byte, dist interface{}) error
 
 var (
 	UnParse ParseValueFunc = func(src []byte, dist interface{}) error {
+		if err := check(src, dist); err != nil {
+			return err
+		}
 		d := dist.(*interface{})
 		*d = src
 		return nil
 	}
 	TextUnmarshal ParseValueFunc = func(src []byte, dist interface{}) error {
+		if err := check(src, dist); err != nil {
+			return err
+		}
 		d := dist.(*interface{})
 		*d = util.BytesToStringWithNoCopy(src)
 		return nil
 	}
 	MapUnmarshal ParseValueFunc = func(src []byte, dist interface{}) error {
+		if err := check(src, dist); err != nil {
+			return err
+		}
 		d := dist.(*interface{})
 		m := (*d).(map[string]string)
 		return json.Unmarshal(src, &m)
 	}
 	JsonUnmarshal ParseValueFunc = func(src []byte, dist interface{}) error {
+		if err := check(src, dist); err != nil {
+			return err
+		}
 		d := dist.(*interface{})
 		return json.Unmarshal(src, *d)
 	}
@@ -89,3 +107,13 @@ var (
 	DependencyRuleParser  = &CommonParser{newDependencyRule, JsonUnmarshal}
 	DependencyQueueParser = &CommonParser{newDependencyQueue, JsonUnmarshal}
 )
+
+func check(src []byte, dist interface{}) error {
+	if src == nil {
+		return errParseNilPoint
+	}
+	if dist == nil {
+		return errTargetNilPoint
+	}
+	return nil
+}
diff --git a/server/core/proto/parser_test.go b/server/core/proto/parser_test.go
index d607fda..43de2e1 100644
--- a/server/core/proto/parser_test.go
+++ b/server/core/proto/parser_test.go
@@ -21,7 +21,11 @@ import (
 )
 
 func TestParseInnerValueTypeFunc(t *testing.T) {
-	r, err := BytesParser.Unmarshal([]byte("a"))
+	r, err := BytesParser.Unmarshal(nil)
+	if err == nil {
+		t.Fatalf("BytesParser.Unmarshal failed")
+	}
+	r, err = BytesParser.Unmarshal([]byte("a"))
 	if err != nil {
 		t.Fatalf("BytesParser.Unmarshal failed, %s", err.Error())
 	}
@@ -29,6 +33,10 @@ func TestParseInnerValueTypeFunc(t *testing.T) {
 		t.Fatalf("BytesParser.Unmarshal failed, %s", v)
 	}
 
+	r, err = StringParser.Unmarshal(nil)
+	if err == nil {
+		t.Fatalf("StringParser.Unmarshal failed")
+	}
 	r, err = StringParser.Unmarshal([]byte("abc"))
 	if err != nil {
 		t.Fatalf("StringParser.Unmarshal failed, %s", err.Error())
@@ -37,6 +45,10 @@ func TestParseInnerValueTypeFunc(t *testing.T) {
 		t.Fatalf("StringParser.Unmarshal failed, %s", v)
 	}
 
+	r, err = MapParser.Unmarshal(nil)
+	if err == nil {
+		t.Fatalf("MapParser.Unmarshal failed")
+	}
 	r, err = MapParser.Unmarshal([]byte(`{"a": "abc"}`))
 	if err != nil {
 		t.Fatalf("MapParser.Unmarshal failed, %s", err.Error())
@@ -51,6 +63,10 @@ func TestParseInnerValueTypeFunc(t *testing.T) {
 	}
 
 	var m interface{} = new(MicroService)
+	err = JsonUnmarshal(nil, nil)
+	if err == nil {
+		t.Fatalf("JsonUnmarshal failed")
+	}
 	err = JsonUnmarshal([]byte(`{"serviceName": "abc"}`), &m)
 	if err != nil {
 		t.Fatalf("MapParser.Unmarshal failed, %v", err)
diff --git a/server/core/proto/types.go b/server/core/proto/types.go
index a157ebf..3acde78 100644
--- a/server/core/proto/types.go
+++ b/server/core/proto/types.go
@@ -69,6 +69,8 @@ type ServerConfig struct {
 	ServiceClearInterval time.Duration `json:"serviceClearInterval"`
 	//if a service's existence time reaches this value, it can be cleared
 	ServiceTTL time.Duration `json:"serviceTTL"`
+	//CacheTTL is the ttl of cache
+	CacheTTL time.Duration `json:"cacheTTL"`
 }
 
 type ServerInformation struct {
diff --git a/server/plugin/pkg/discovery/cache.go b/server/plugin/pkg/discovery/cache.go
index b0ac547..1dc6268 100644
--- a/server/plugin/pkg/discovery/cache.go
+++ b/server/plugin/pkg/discovery/cache.go
@@ -30,6 +30,12 @@ type Cache interface {
 	Put(k string, v *KeyValue)
 	// Remove removes a k-v data
 	Remove(k string)
+	// MarkDirty mark k-v data dirty
+	MarkDirty()
+	// Dirty k-v data is dirty or not
+	Dirty() bool
+	// Clear clear all k-v data
+	Clear()
 }
 
 // CacheReader reads k-v data.
diff --git a/server/plugin/pkg/discovery/cache_kv.go b/server/plugin/pkg/discovery/cache_kv.go
index f51f1d2..9776501 100644
--- a/server/plugin/pkg/discovery/cache_kv.go
+++ b/server/plugin/pkg/discovery/cache_kv.go
@@ -17,22 +17,21 @@
 package discovery
 
 import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"strings"
 	"sync"
-	"time"
 )
 
 // KvCache implements Cache.
 // KvCache is dedicated to stores service discovery data,
 // e.g. service, instance, lease.
 type KvCache struct {
-	Cfg         *Config
-	name        string
-	store       map[string]map[string]*KeyValue
-	rwMux       sync.RWMutex
-	lastRefresh time.Time
-	lastMaxSize int
+	Cfg   *Config
+	name  string
+	store map[string]map[string]*KeyValue
+	rwMux sync.RWMutex
+	dirty bool
 }
 
 func (c *KvCache) Name() string {
@@ -82,6 +81,21 @@ func (c *KvCache) Remove(key string) {
 	c.rwMux.Unlock()
 }
 
+func (c *KvCache) MarkDirty() {
+	c.dirty = true
+	log.Warnf("Cache[%s] is marked dirty!", c.name)
+}
+
+func (c *KvCache) Dirty() bool { return c.dirty }
+
+func (c *KvCache) Clear() {
+	c.rwMux.Lock()
+	c.dirty = false
+	c.store = make(map[string]map[string]*KeyValue)
+	c.rwMux.Unlock()
+	log.Warnf("Cache[%s] is clear!", c.name)
+}
+
 func (c *KvCache) ForEach(iter func(k string, v *KeyValue) (next bool)) {
 	c.rwMux.RLock()
 loopParent:
@@ -176,9 +190,8 @@ func (c *KvCache) deletePrefixKey(key string) {
 
 func NewKvCache(name string, cfg *Config) *KvCache {
 	return &KvCache{
-		Cfg:         cfg,
-		name:        name,
-		store:       make(map[string]map[string]*KeyValue),
-		lastRefresh: time.Now(),
+		Cfg:   cfg,
+		name:  name,
+		store: make(map[string]map[string]*KeyValue),
 	}
 }
diff --git a/server/plugin/pkg/discovery/cache_null.go b/server/plugin/pkg/discovery/cache_null.go
index 8bb00fe..1f12041 100644
--- a/server/plugin/pkg/discovery/cache_null.go
+++ b/server/plugin/pkg/discovery/cache_null.go
@@ -32,6 +32,9 @@ func (n *nullCache) GetPrefix(prefix string, arr *[]*KeyValue) int        { retu
 func (n *nullCache) ForEach(iter func(k string, v *KeyValue) (next bool)) {}
 func (n *nullCache) Put(k string, v *KeyValue)                            {}
 func (n *nullCache) Remove(k string)                                      {}
+func (n *nullCache) MarkDirty()                                           {}
+func (n *nullCache) Dirty() bool                                          { return false }
+func (n *nullCache) Clear()                                               {}
 
 type nullCacher struct {
 }
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv.go b/server/plugin/pkg/discovery/etcd/cacher_kv.go
index 442c0fc..53d9dea 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv.go
@@ -79,19 +79,42 @@ func (c *KvCacher) doList(cfg ListWatchConfig) error {
 
 	kvs := resp.Kvs
 	start := time.Now()
+	defer log.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d",
+		c.Cfg.Key, len(kvs), c.lw.Revision())
+
+	// calc and return the diff between cache and ETCD
 	evts := c.filter(c.lw.Revision(), kvs)
+
+	// just reset the cacher if cache marked dirty
+	if c.cache.Dirty() {
+		c.reset(evts)
+		return nil
+	}
+
+	// there is no change between List() and cache, then stop the self preservation
 	if ec, kc := len(evts), len(kvs); c.Cfg.DeferHandler != nil && ec == 0 && kc != 0 &&
 		c.Cfg.DeferHandler.Reset() {
 		log.Warnf("most of the protected data(%d/%d) are recovered",
 			kc, c.cache.GetAll(nil))
 	}
-	c.sync(evts)
-	log.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d",
-		c.Cfg.Key, len(kvs), c.lw.Revision())
 
+	// notify the subscribers
+	c.sync(evts)
 	return nil
 }
 
+func (c *KvCacher) reset(evts []discovery.KvEvent) {
+	if c.Cfg.DeferHandler != nil {
+		c.Cfg.DeferHandler.Reset()
+	}
+	// clear cache before Set is safe, because the watch operation is stop,
+	// but here will make all API requests go to ETCD directly.
+	c.cache.Clear()
+	// do not notify when cacher is dirty status,
+	// otherwise, too many events will notify to downstream.
+	c.buildCache(evts)
+}
+
 func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
 	if watcher := c.lw.Watch(cfg); watcher != nil {
 		return c.handleWatcher(watcher)
@@ -102,6 +125,7 @@ func (c *KvCacher) doWatch(cfg ListWatchConfig) error {
 func (c *KvCacher) ListAndWatch(ctx context.Context) error {
 	c.mux.Lock()
 	defer c.mux.Unlock()
+	defer log.Recover() // ensure ListAndWatch never raise panic
 
 	cfg := ListWatchConfig{
 		Timeout: c.Cfg.Timeout,
@@ -111,7 +135,7 @@ func (c *KvCacher) ListAndWatch(ctx context.Context) error {
 	// the scenario need to list etcd:
 	// 1. Initial: cache is building, the lister's revision is 0.
 	// 2. Runtime: error occurs in previous watch operation, the lister's revision is set to 0.
-	// 3. Runtime: no event comes in watch operation over DEFAULT_FORCE_LIST_INTERVAL times.
+	// 3. Runtime: watch operation timed out over DEFAULT_FORCE_LIST_INTERVAL times.
 	if c.needList() {
 		if err := c.doList(cfg); err != nil && (!c.IsReady() || c.lw.Revision() == 0) {
 			return err // do retry to list etcd
@@ -147,7 +171,7 @@ func (c *KvCacher) handleWatcher(watcher Watcher) error {
 			case resp.Action == registry.Delete:
 				evt.Type = proto.EVT_DELETE
 				if kv.Value == nil {
-					// it will happen in embed mode, and then need to get the cache value to unmarshal
+					// it will happen in embed mode, and then need to get the cache value not unmarshal
 					evt.KV = c.cache.Get(util.BytesToStringWithNoCopy(kv.Key))
 				} else {
 					evt.KV = c.doParse(kv)
@@ -157,6 +181,7 @@ func (c *KvCacher) handleWatcher(watcher Watcher) error {
 				continue
 			}
 			if evt.KV == nil {
+				log.Errorf(nil, "failed to parse KeyValue %v", kv)
 				continue
 			}
 			evts = append(evts, evt)
@@ -320,7 +345,18 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
 	if c.Cfg.DeferHandler == nil {
 		return
 	}
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			c.handleDeferEvents(ctx)
+		}
+	}
+}
 
+func (c *KvCacher) handleDeferEvents(ctx context.Context) {
+	defer log.Recover()
 	var (
 		evts = make([]discovery.KvEvent, eventBlockSize)
 		i    int
@@ -362,6 +398,11 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
 }
 
 func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
+	c.buildCache(evts)
+	c.notify(evts)
+}
+
+func (c *KvCacher) buildCache(evts []discovery.KvEvent) {
 	init := !c.IsReady()
 	for i, evt := range evts {
 		key := util.BytesToStringWithNoCopy(evt.KV.Key)
@@ -396,9 +437,6 @@ func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
 			evts[i] = evt
 		}
 	}
-
-	c.notify(evts)
-
 	discovery.ReportProcessEventCompleted(c.Cfg.Key, evts)
 }
 
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv_test.go b/server/plugin/pkg/discovery/etcd/cacher_kv_test.go
index 61f5237..dea9d41 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv_test.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv_test.go
@@ -39,7 +39,6 @@ type mockCache struct {
 func (n *mockCache) Name() string                          { return "NULL" }
 func (n *mockCache) Size() int                             { return 0 }
 func (n *mockCache) GetAll(arr *[]*discovery.KeyValue) int { return 0 }
-
 func (n *mockCache) Get(k string) *discovery.KeyValue {
 	if k == n.Key {
 		return n.KV
@@ -68,6 +67,9 @@ func (n *mockCache) Remove(k string) {
 		n.KV = nil
 	}
 }
+func (n *mockCache) MarkDirty()  {}
+func (n *mockCache) Dirty() bool { return false }
+func (n *mockCache) Clear()      {}
 
 func TestNewKvCacher(t *testing.T) {
 	w := &mockWatcher{}
diff --git a/server/plugin/pkg/discovery/indexer_test.go b/server/plugin/pkg/discovery/indexer_test.go
index 770717f..ce97bc3 100644
--- a/server/plugin/pkg/discovery/indexer_test.go
+++ b/server/plugin/pkg/discovery/indexer_test.go
@@ -49,7 +49,6 @@ func (n *mockCache) Put(k string, v *KeyValue) {
 	n.Key = k
 	n.KV = v
 }
-func (n *mockCache) Remove(k string) {}
 
 func TestCacheIndexer_Search(t *testing.T) {
 	c := &mockCache{}
diff --git a/server/service/util/microservice_util.go b/server/service/util/microservice_util.go
index 1972395..46a66ca 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -101,12 +101,9 @@ func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.Mi
 		domainProject := parts[4] + apt.SPLIT + parts[5]
 		microService, ok := value.Value.(*pb.MicroService)
 		if !ok {
-			log.Error("backend data is not type *pb.MicroService", nil)
+			log.Errorf(nil, "backend key[%s]'s value is not type *pb.MicroService", prefix)
 			continue
 		}
-		if _, ok := services[domainProject]; !ok {
-			services[domainProject] = make([]*pb.MicroService, 0)
-		}
 		services[domainProject] = append(services[domainProject], microService)
 	}
 	return services, nil