You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/24 07:57:26 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-508 Cache does not match the etcd store (#332)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 24eb859 SCB-508 Cache does not match the etcd store (#332)
24eb859 is described below
commit 24eb859334ff32da88306c695778a303b572e39a
Author: little-cui <su...@qq.com>
AuthorDate: Tue Apr 24 15:57:23 2018 +0800
SCB-508 Cache does not match the etcd store (#332)
* reuse timer
* SCB-508 Cache does not match the etcd store
* SCB-508 Cache does not match the etcd store
* SCB-508 Cache does not match the etcd store
* SCB-513 Optimize Event handle
* SCB-508 Set Cacher ready after sync cache
* SCB-508 Add cache size metric
* SCB-508 Add cache size metric
* SCB-508 Use standard testing output
* SCB-508 Optimize store
---
integration/health-metrics-grafana.json | 117 ++++-
pkg/async/async_task_test.go | 42 +-
pkg/chain/chain_test.go | 2 +
pkg/plugin/loader_test.go | 28 +-
pkg/util/concurrent_map_test.go | 30 +-
pkg/util/goroutines_test.go | 10 +-
pkg/util/log_test.go | 8 +-
pkg/util/net_test.go | 16 +-
pkg/util/reflect.go | 85 +++-
pkg/util/reflect_test.go | 38 +-
pkg/util/tree_test.go | 2 +-
pkg/util/uniqueue_test.go | 26 +-
pkg/util/util_test.go | 7 -
server/broker/store.go | 4 +-
.../core/backend/store/{cacher.go => cache_kv.go} | 250 ++++------
.../core/backend/store/cache_null.go | 85 ++--
server/core/backend/store/cacher.go | 549 +--------------------
server/core/backend/store/cacher_test.go | 84 ++++
server/core/backend/store/common.go | 133 +++++
server/core/backend/store/defer.go | 140 +-----
.../backend/store/{defer.go => defer_instance.go} | 59 ++-
server/core/backend/store/defer_test.go | 61 ++-
server/core/backend/store/event.go | 14 +-
server/core/backend/store/indexer.go | 41 +-
server/core/backend/store/listwatch.go | 48 +-
.../handler.go => core/backend/store/metric.go} | 41 +-
server/core/backend/store/opt.go | 7 +-
server/core/backend/store/store.go | 123 +----
server/core/proto/services.go | 2 +
server/plugin/infra/registry/etcd/etcd_test.go | 3 +-
.../infra/tracing/buildin/file_collector_test.go | 4 +-
server/plugin/infra/tracing/buildin/span_test.go | 9 +-
server/rest/handler.go | 2 +-
server/rest/metric.go | 2 +-
server/rest/server.go | 1 +
pkg/util/util_test.go => server/rest/server_mux.go | 47 +-
server/service/event/dependency_event_handler.go | 4 +-
server/service/event/instance_event_handler.go | 7 +-
server/service/event/rule_event_handler.go | 7 +-
server/service/event/service_event_handler.go | 7 +-
server/service/event/tag_event_handler.go | 7 +-
server/service/util/dependency_test.go | 157 ++----
server/service/util/domain_util_test.go | 34 +-
server/service/util/heartbeat_util_test.go | 7 +-
server/service/util/instance_util_test.go | 40 +-
server/service/util/rule_util_test.go | 94 ++--
server/service/util/tag_util_test.go | 10 +-
server/service/util/util_suite_test.go | 15 +-
48 files changed, 966 insertions(+), 1543 deletions(-)
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index aff03ab..55e5c69 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
{
"annotations": {
"list": [
@@ -31,7 +16,7 @@
"gnetId": null,
"graphTooltip": 0,
"hideControls": false,
- "id": 5,
+ "id": 6,
"links": [
{
"icon": "external link",
@@ -57,9 +42,9 @@
"height": "",
"id": 1,
"legend": {
- "alignAsTable": false,
+ "alignAsTable": true,
"avg": true,
- "current": false,
+ "current": true,
"max": false,
"min": false,
"show": true,
@@ -448,7 +433,7 @@
"repeatIteration": null,
"repeatRowId": null,
"showTitle": false,
- "title": "Dashboard Row",
+ "title": "service metrics",
"titleSize": "h6"
},
{
@@ -571,7 +556,7 @@
"legend": {
"alignAsTable": true,
"avg": true,
- "current": false,
+ "current": true,
"max": true,
"min": true,
"show": true,
@@ -649,7 +634,7 @@
"legend": {
"alignAsTable": true,
"avg": true,
- "current": false,
+ "current": true,
"max": true,
"min": true,
"show": true,
@@ -909,7 +894,7 @@
"dashes": false,
"datasource": null,
"fill": 1,
- "id": 7,
+ "id": 14,
"legend": {
"alignAsTable": true,
"avg": true,
@@ -977,6 +962,92 @@
"show": true
}
]
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": null,
+ "fill": 1,
+ "id": 7,
+ "legend": {
+ "alignAsTable": true,
+ "avg": false,
+ "current": true,
+ "max": true,
+ "min": false,
+ "rightSide": true,
+ "show": true,
+ "sort": "current",
+ "sortDesc": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "span": 12,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "service_center_local_cache_size_bytes",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "{{resource}}.{{type}}",
+ "refId": "A"
+ },
+ {
+ "expr": "sum(service_center_local_cache_size_bytes)",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "ALL",
+ "refId": "B"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Cache",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "decbytes",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ]
}
],
"repeat": null,
@@ -994,7 +1065,7 @@
"list": []
},
"time": {
- "from": "now-5m",
+ "from": "now-15m",
"to": "now"
},
"timepicker": {
diff --git a/pkg/async/async_task_test.go b/pkg/async/async_task_test.go
index 0504799..230637a 100644
--- a/pkg/async/async_task_test.go
+++ b/pkg/async/async_task_test.go
@@ -21,28 +21,10 @@ import (
"errors"
"fmt"
- "github.com/apache/incubator-servicecomb-service-center/pkg/lager"
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
"golang.org/x/net/context"
"time"
)
-func init() {
- util.InitGlobalLogger("async_task_test", &log.Config{
- LoggerLevel: "DEBUG",
- LoggerFile: "",
- EnableRsyslog: false,
- LogFormatText: true,
- EnableStdOut: true,
- })
-}
-
-func fail(t *testing.T, format string, args ...interface{}) {
- fmt.Printf(format, args...)
- fmt.Println()
- t.FailNow()
-}
-
type testTask struct {
done context.CancelFunc
test string
@@ -82,7 +64,7 @@ func TestBaseAsyncTasker_AddTask(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
err := at.Add(ctx, nil)
if err == nil {
- fail(t, "add nil task should be error")
+ t.Fatalf("add nil task should be error")
}
cancel()
@@ -92,11 +74,11 @@ func TestBaseAsyncTasker_AddTask(t *testing.T) {
test: "test1",
})
if testCtx1.Err() == nil || err == nil || err.Error() != "test1" {
- fail(t, "first time add task should be sync")
+ t.Fatalf("first time add task should be sync")
}
lt, _ := at.LatestHandled("test")
if lt.Err().Error() != "test1" {
- fail(t, "should get first handled task 'test1'")
+ t.Fatalf("should get first handled task 'test1'")
}
testCtx2, testC2 := context.WithTimeout(context.Background(), 3*time.Second)
@@ -105,12 +87,12 @@ func TestBaseAsyncTasker_AddTask(t *testing.T) {
test: "test2",
})
if err.Error() != "test1" {
- fail(t, "second time add task should return prev result")
+ t.Fatalf("second time add task should return prev result")
}
<-testCtx2.Done()
lt, _ = at.LatestHandled("test")
if lt.Err().Error() != "test2" {
- fail(t, "should get second handled task 'test2'")
+ t.Fatalf("should get second handled task 'test2'")
}
}
@@ -126,7 +108,7 @@ func TestBaseAsyncTasker_Stop(t *testing.T) {
result: true,
})
if err != nil {
- fail(t, "add task should be ok")
+ t.Fatalf("add task should be ok")
}
_, cancel = context.WithCancel(context.Background())
err = at.Add(context.Background(), &testTask{
@@ -136,14 +118,14 @@ func TestBaseAsyncTasker_Stop(t *testing.T) {
result: true,
})
if err != nil {
- fail(t, "add task should be ok")
+ t.Fatalf("add task should be ok")
}
<-time.After(time.Second)
at.Stop()
err = at.Add(context.Background(), &testTask{result: true})
if err != nil {
- fail(t, "add task should be ok when Tasker is stopped")
+ t.Fatalf("add task should be ok when Tasker is stopped")
}
at.Stop()
@@ -155,7 +137,7 @@ func TestBaseAsyncTasker_RemoveTask(t *testing.T) {
err := at.DeferRemove("test")
if err != nil {
- fail(t, "remove task should be ok")
+ t.Fatalf("remove task should be ok")
}
_, cancel := context.WithCancel(context.Background())
err = at.Add(context.Background(), &testTask{
@@ -165,18 +147,18 @@ func TestBaseAsyncTasker_RemoveTask(t *testing.T) {
wait: 33 * time.Second,
})
if err != nil {
- fail(t, "add task should be ok")
+ t.Fatalf("add task should be ok")
}
fmt.Println("OK")
err = at.DeferRemove("test")
if err != nil {
- fail(t, "remove task should be ok")
+ t.Fatalf("remove task should be ok")
}
at.Stop()
err = at.DeferRemove("test")
if err == nil {
- fail(t, "remove task should be error when Tasker is stopped")
+ t.Fatalf("remove task should be error when Tasker is stopped")
}
}
diff --git a/pkg/chain/chain_test.go b/pkg/chain/chain_test.go
index 3e5b8c0..4a4b2cb 100644
--- a/pkg/chain/chain_test.go
+++ b/pkg/chain/chain_test.go
@@ -59,6 +59,7 @@ func BenchmarkChain(b *testing.B) {
}
})
b.ReportAllocs()
+ // 1000000 5119 ns/op 176 B/op 3 allocs/op
}
func BenchmarkSync(b *testing.B) {
@@ -70,4 +71,5 @@ func BenchmarkSync(b *testing.B) {
}
})
b.ReportAllocs()
+ // 1000000 13.7 ns/op 0 B/op 0 allocs/op
}
diff --git a/pkg/plugin/loader_test.go b/pkg/plugin/loader_test.go
index d969f44..a753ecc 100644
--- a/pkg/plugin/loader_test.go
+++ b/pkg/plugin/loader_test.go
@@ -17,7 +17,6 @@
package plugin
import (
- "fmt"
pg "plugin"
"testing"
)
@@ -25,8 +24,7 @@ import (
func TestLoader_Init(t *testing.T) {
defer func() {
if err := recover(); err != nil {
- fmt.Printf(`TestLoader_Init failed, %v`, err)
- t.FailNow()
+ t.Fatalf(`TestLoader_Init failed, %v`, err)
}
}()
loader := Loader{}
@@ -38,15 +36,13 @@ func TestLoader_ReloadPlugins(t *testing.T) {
loader.Init()
err := loader.ReloadPlugins()
if err != nil {
- fmt.Printf(`TestLoader_ReloadPlugins failed, %s`, err.Error())
- t.FailNow()
+ t.Fatalf(`TestLoader_ReloadPlugins failed, %s`, err.Error())
}
loader.Dir = "xxx"
err = loader.ReloadPlugins()
if err == nil {
- fmt.Printf(`TestLoader_ReloadPlugins failed`)
- t.FailNow()
+ t.Fatalf(`TestLoader_ReloadPlugins failed`)
}
}
@@ -55,8 +51,7 @@ func TestLoader_Exist(t *testing.T) {
loader.Init()
b := loader.Exist("")
if b {
- fmt.Printf(`TestLoader_Exist failed`)
- t.FailNow()
+ t.Fatalf(`TestLoader_Exist failed`)
}
}
@@ -65,15 +60,13 @@ func TestLoader_Find(t *testing.T) {
loader.Init()
f, err := loader.Find("", "")
if err == nil || f != nil {
- fmt.Printf(`TestLoader_Find failed`)
- t.FailNow()
+ t.Fatalf(`TestLoader_Find failed`)
}
loader.Plugins["a"] = &wrapPlugin{&pg.Plugin{}, make(map[string]pg.Symbol)}
f, err = loader.Find("a", "")
if err == nil || f != nil {
- fmt.Printf(`TestLoader_Find failed`)
- t.FailNow()
+ t.Fatalf(`TestLoader_Find failed`)
}
}
@@ -84,19 +77,16 @@ func TestSetPluginDir(t *testing.T) {
func TestPluginLoader(t *testing.T) {
loader := PluginLoader()
if loader == nil {
- fmt.Printf(`TestPluginLoader failed`)
- t.FailNow()
+ t.Fatalf(`TestPluginLoader failed`)
}
err := Reload()
if err != nil {
- fmt.Printf(`TestPluginLoader Reload failed, %s`, err)
- t.FailNow()
+ t.Fatalf(`TestPluginLoader Reload failed, %s`, err)
}
f, err := FindFunc("", "")
if err == nil || f != nil {
- fmt.Printf(`TestPluginLoader FindFunc failed`)
- t.FailNow()
+ t.Fatalf(`TestPluginLoader FindFunc failed`)
}
}
diff --git a/pkg/util/concurrent_map_test.go b/pkg/util/concurrent_map_test.go
index 90fa711..344de5d 100644
--- a/pkg/util/concurrent_map_test.go
+++ b/pkg/util/concurrent_map_test.go
@@ -24,52 +24,52 @@ func TestConcurrentMap(t *testing.T) {
cm := ConcurrentMap{}
s := cm.Size()
if s != 0 {
- fail(t, "TestConcurrentMap Size failed.")
+ t.Fatalf("TestConcurrentMap Size failed.")
}
v, b := cm.Get("a")
if b || v != nil {
- fail(t, "TestConcurrentMap Get a not exist item failed.")
+ t.Fatalf("TestConcurrentMap Get a not exist item failed.")
}
v = cm.Put("a", "1")
if v != nil {
- fail(t, "TestConcurrentMap Put a new item failed.")
+ t.Fatalf("TestConcurrentMap Put a new item failed.")
}
v, b = cm.Get("a")
if !b || v.(string) != "1" {
- fail(t, "TestConcurrentMap Get an exist item failed.")
+ t.Fatalf("TestConcurrentMap Get an exist item failed.")
}
v = cm.Put("a", "2")
if v.(string) != "1" {
- fail(t, "TestConcurrentMap Put an item again failed.")
+ t.Fatalf("TestConcurrentMap Put an item again failed.")
}
v = cm.PutIfAbsent("b", "1")
if v != nil {
- fail(t, "TestConcurrentMap PutIfAbsent a not exist item failed.")
+ t.Fatalf("TestConcurrentMap PutIfAbsent a not exist item failed.")
}
v = cm.PutIfAbsent("a", "3")
if v.(string) != "2" {
- fail(t, "TestConcurrentMap PutIfAbsent an item failed.")
+ t.Fatalf("TestConcurrentMap PutIfAbsent an item failed.")
}
v, b = cm.Get("a")
if !b || v.(string) != "2" {
- fail(t, "TestConcurrentMap Get an item after PutIfAbsent failed.")
+ t.Fatalf("TestConcurrentMap Get an item after PutIfAbsent failed.")
}
v = cm.Remove("a")
if v.(string) != "2" {
- fail(t, "TestConcurrentMap Remove an item failed.")
+ t.Fatalf("TestConcurrentMap Remove an item failed.")
}
v, b = cm.Get("a")
if b || v != nil {
- fail(t, "TestConcurrentMap Get an item after Remove failed.")
+ t.Fatalf("TestConcurrentMap Get an item after Remove failed.")
}
s = cm.Size()
if s != 1 { // only 'b' is left
- fail(t, "TestConcurrentMap Size after Put failed.")
+ t.Fatalf("TestConcurrentMap Size after Put failed.")
}
cm.Clear()
s = cm.Size()
if s != 0 {
- fail(t, "TestConcurrentMap Size after Clear failed.")
+ t.Fatalf("TestConcurrentMap Size after Clear failed.")
}
}
@@ -81,7 +81,7 @@ func TestConcurrentMap_ForEach(t *testing.T) {
return true
})
if l != 0 {
- fail(t, "TestConcurrentMap_ForEach failed.")
+ t.Fatalf("TestConcurrentMap_ForEach failed.")
}
for i := 0; i < 1000; i++ {
cm.Put(i, i)
@@ -92,14 +92,14 @@ func TestConcurrentMap_ForEach(t *testing.T) {
return true
})
if l != 1000 || cm.Size() != 0 {
- fail(t, "TestConcurrentMap_ForEach does not empty failed.")
+ t.Fatalf("TestConcurrentMap_ForEach does not empty failed.")
}
}
func TestNewConcurrentMap(t *testing.T) {
cm := NewConcurrentMap(100)
if cm.size != 100 {
- fail(t, "TestNewConcurrentMap failed.")
+ t.Fatalf("TestNewConcurrentMap failed.")
}
}
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index d808814..4f72d0e 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -32,7 +32,7 @@ func TestGoRoutine_Do(t *testing.T) {
defer close(stopCh1)
select {
case <-ctx.Done():
- fail(t, "ctx should not be done.")
+ t.Fatalf("ctx should not be done.")
case <-time.After(time.Second):
}
})
@@ -47,7 +47,7 @@ func TestGoRoutine_Do(t *testing.T) {
select {
case <-ctx.Done():
case <-time.After(time.Second):
- fail(t, "time out to wait stopCh2 close.")
+ t.Fatalf("time out to wait stopCh2 close.")
}
})
cancel()
@@ -62,7 +62,7 @@ func TestGoRoutine_Do(t *testing.T) {
select {
case <-ctx.Done():
case <-time.After(time.Second):
- fail(t, "time out to wait ctx done.")
+ t.Fatalf("time out to wait ctx done.")
}
})
<-stopCh3
@@ -92,7 +92,7 @@ func TestGoRoutine_Wait(t *testing.T) {
test.Wait()
fmt.Println(resultArr)
if len(resultArr) != MAX {
- fail(t, "fail to wait all goroutines finish.")
+ t.Fatalf("fail to wait all goroutines finish.")
}
}
@@ -102,7 +102,7 @@ func TestGoRoutine_Close(t *testing.T) {
select {
case <-ctx.Done():
case <-time.After(time.Second):
- fail(t, "time out to wait ctx close.")
+ t.Fatalf("time out to wait ctx close.")
}
})
test.Close(true)
diff --git a/pkg/util/log_test.go b/pkg/util/log_test.go
index bd030d0..d513e22 100644
--- a/pkg/util/log_test.go
+++ b/pkg/util/log_test.go
@@ -24,21 +24,21 @@ func TestLogger(t *testing.T) {
CustomLogger("Not Exist", "testDefaultLOGGER")
l := Logger()
if l != LOGGER {
- fail(t, "should equal to LOGGER")
+ t.Fatalf("should equal to LOGGER")
}
CustomLogger("TestLogger", "testFuncName")
l = Logger()
if l == LOGGER || l == nil {
- fail(t, "should create a new instance for 'TestLogger'")
+ t.Fatalf("should create a new instance for 'TestLogger'")
}
s := Logger()
if l != s {
- fail(t, "should be the same logger")
+ t.Fatalf("should be the same logger")
}
CustomLogger("github.com/apache/incubator-servicecomb-service-center/pkg/util", "testPkgPath")
l = Logger()
if l == LOGGER || l == nil {
- fail(t, "should create a new instance for 'util'")
+ t.Fatalf("should create a new instance for 'util'")
}
// l.Infof("OK")
}
diff --git a/pkg/util/net_test.go b/pkg/util/net_test.go
index c148406..e4d41b8 100644
--- a/pkg/util/net_test.go
+++ b/pkg/util/net_test.go
@@ -30,40 +30,40 @@ const (
func TestInetAton(t *testing.T) {
i := InetAton(ip1)
if i != 2130706433 {
- fail(t, "InetAton(%s) error", ip1)
+ t.Fatalf("InetAton(%s) error", ip1)
}
i = InetAton(ip2)
if i != 0 {
- fail(t, "InetAton(%s) error", ip2)
+ t.Fatalf("InetAton(%s) error", ip2)
}
i = InetAton(ip3)
if i != 4294967295 {
- fail(t, "InetAton(%s) error", ip3)
+ t.Fatalf("InetAton(%s) error", ip3)
}
}
func TestInetNtoa(t *testing.T) {
ip := InetNtoa(n1)
if ip != ip1 {
- fail(t, "InetNtoa(%d) error", n1)
+ t.Fatalf("InetNtoa(%d) error", n1)
}
ip = InetNtoa(n2)
if ip != ip2 {
- fail(t, "InetNtoa(%d) error", n2)
+ t.Fatalf("InetNtoa(%d) error", n2)
}
ip = InetNtoa(n3)
if ip != ip3 {
- fail(t, "InetNtoa(%d) error", n3)
+ t.Fatalf("InetNtoa(%d) error", n3)
}
}
func TestParseIpPort(t *testing.T) {
ipPort := ParseIpPort("0.0.0.0")
if ipPort.IP != "0.0.0.0" || ipPort.Port != 0 {
- fail(t, "ParseIpPort(0.0.0.0) error", n3)
+ t.Fatalf("ParseIpPort(0.0.0.0) error", n3)
}
ipPort = ParseIpPort("0.0.0.0:1")
if ipPort.IP != "0.0.0.0" || ipPort.Port != 1 {
- fail(t, "ParseIpPort(0.0.0.0) error", n3)
+ t.Fatalf("ParseIpPort(0.0.0.0) error", n3)
}
}
diff --git a/pkg/util/reflect.go b/pkg/util/reflect.go
index a5770f0..6efa1ae 100644
--- a/pkg/util/reflect.go
+++ b/pkg/util/reflect.go
@@ -22,7 +22,11 @@ import (
"unsafe"
)
-var reflector *Reflector
+var (
+ reflector *Reflector
+ sliceTypeSize = uint64(reflect.TypeOf(reflect.SliceHeader{}).Size())
+ stringTypeSize = uint64(reflect.TypeOf(reflect.StringHeader{}).Size())
+)
func init() {
reflector = &Reflector{
@@ -75,3 +79,82 @@ func (r *Reflector) Load(obj interface{}) StructType {
func LoadStruct(obj interface{}) StructType {
return reflector.Load(obj)
}
+
+func Sizeof(obj interface{}) uint64 {
+ selfRecurseMap := make(map[uintptr]struct{})
+ return sizeof(reflect.ValueOf(obj), selfRecurseMap)
+}
+
+func sizeof(v reflect.Value, selfRecurseMap map[uintptr]struct{}) (s uint64) {
+ if !v.IsValid() {
+ return
+ }
+
+ if v.CanAddr() {
+ selfRecurseMap[v.Addr().Pointer()] = struct{}{}
+ }
+
+ t := v.Type()
+ s += uint64(t.Size())
+ switch v.Kind() {
+ case reflect.Ptr:
+ if v.IsNil() {
+ break
+ }
+ if _, ok := selfRecurseMap[v.Pointer()]; ok {
+ break
+ }
+ fallthrough
+ case reflect.Interface:
+ s += sizeof(v.Elem(), selfRecurseMap)
+ case reflect.Struct:
+ s -= uint64(t.Size())
+ for i := 0; i < v.NumField(); i++ {
+ s += sizeof(v.Field(i), selfRecurseMap)
+ }
+ case reflect.Array:
+ if isValueType(t.Elem().Kind()) {
+ break
+ }
+ s -= uint64(t.Size())
+ for i := 0; i < v.Len(); i++ {
+ s += sizeof(v.Index(i), selfRecurseMap)
+ }
+ case reflect.Slice:
+ et := t.Elem()
+ if isValueType(et.Kind()) {
+ s += uint64(v.Len()) * uint64(et.Size())
+ break
+ }
+ for i := 0; i < v.Len(); i++ {
+ s += sizeof(v.Index(i), selfRecurseMap)
+ }
+ case reflect.Map:
+ if v.IsNil() {
+ break
+ }
+ kt, vt := t.Key(), t.Elem()
+ if isValueType(kt.Kind()) && isValueType(vt.Kind()) {
+ s += uint64(kt.Size()+vt.Size()) * uint64(v.Len())
+ break
+ }
+ for _, k := range v.MapKeys() {
+ s += sizeof(k, selfRecurseMap)
+ s += sizeof(v.MapIndex(k), selfRecurseMap)
+ }
+ case reflect.String:
+ s += uint64(v.Len())
+ }
+ return
+}
+
+func isValueType(kind reflect.Kind) bool {
+ switch kind {
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
+ reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr,
+ reflect.Float32, reflect.Float64, reflect.Complex64, reflect.Complex128:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/pkg/util/reflect_test.go b/pkg/util/reflect_test.go
index ae0dfa5..af102e0 100644
--- a/pkg/util/reflect_test.go
+++ b/pkg/util/reflect_test.go
@@ -18,6 +18,7 @@ package util
import (
"fmt"
+ "reflect"
"testing"
)
@@ -35,10 +36,10 @@ func TestLoadStruct(t *testing.T) {
obj1 := testStru{}
v := LoadStruct(obj1)
if v.Type.String() != "util.testStru" {
- fail(t, "TestLoadStruct failed, %s != 'testStru'", v.Type.String())
+ t.Fatalf("TestLoadStruct failed, %s != 'testStru'", v.Type.String())
}
if len(v.Fields) != 4 {
- fail(t, "TestLoadStruct failed, wrong count of fields")
+ t.Fatalf("TestLoadStruct failed, wrong count of fields")
}
for _, f := range v.Fields {
fmt.Println(f.Name, f.Type.String())
@@ -51,10 +52,39 @@ func TestLoadStruct(t *testing.T) {
func BenchmarkLoadStruct(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- obj := testStru{}
- LoadStruct(obj)
+ LoadStruct(testStru{})
}
})
b.ReportAllocs()
// 20000000 86.9 ns/op 32 B/op 1 allocs/op
}
+
+var (
+ sliceSize = uint64(reflect.TypeOf(reflect.SliceHeader{}).Size())
+ stringSize = uint64(reflect.TypeOf(reflect.StringHeader{}).Size())
+)
+
+type S struct {
+ a int
+ s string
+ p *S
+ m map[int32]uint32
+ u []uint64
+ ua [8]uint64
+ ch chan int
+ i interface{}
+}
+
+func BenchmarkSizeof(b *testing.B) {
+ s := &S{}
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ Sizeof(S{
+ p: s,
+ i: s,
+ })
+ }
+ })
+ b.ReportAllocs()
+ // 2000000 650 ns/op 160 B/op 1 allocs/op
+}
diff --git a/pkg/util/tree_test.go b/pkg/util/tree_test.go
index f650424..e1b7265 100644
--- a/pkg/util/tree_test.go
+++ b/pkg/util/tree_test.go
@@ -46,6 +46,6 @@ func TestTree(t *testing.T) {
testTree.InOrderTraversal(testTree.GetRoot(), handle)
if !reflect.DeepEqual(slice, targetSlice) {
- fail(t, `TestTree failed`)
+ t.Fatalf(`TestTree failed`)
}
}
diff --git a/pkg/util/uniqueue_test.go b/pkg/util/uniqueue_test.go
index 6dca00d..8621bcc 100644
--- a/pkg/util/uniqueue_test.go
+++ b/pkg/util/uniqueue_test.go
@@ -28,19 +28,19 @@ import (
func TestNewUniQueue(t *testing.T) {
_, err := newUniQueue(0)
if err == nil {
- fail(t, "newUniQueue(0) should return error")
+ t.Fatalf("newUniQueue(0) should return error")
}
_, err = newUniQueue(math.MaxInt32)
if err == nil {
- fail(t, "newUniQueue(math.MaxInt32) should return error")
+ t.Fatalf("newUniQueue(math.MaxInt32) should return error")
}
uq, err := newUniQueue(1)
if err != nil || uq == nil {
- fail(t, "newUniQueue(1) should return ok")
+ t.Fatalf("newUniQueue(1) should return ok")
}
uq = NewUniQueue()
if uq == nil {
- fail(t, "NewUniQueue should return ok")
+ t.Fatalf("NewUniQueue should return ok")
}
}
@@ -48,14 +48,14 @@ func TestUniQueue_Close(t *testing.T) {
uq := NewUniQueue()
err := uq.Put(context.Background(), "abc")
if err != nil {
- fail(t, "NewUniQueue should return ok")
+ t.Fatalf("NewUniQueue should return ok")
}
uq.Close()
item := uq.Get(context.Background())
if item != nil {
- fail(t, "Get expect '%v' to 'nil' when queue closed", item)
+ t.Fatalf("Get expect '%v' to 'nil' when queue closed", item)
}
uq.Close()
@@ -69,16 +69,16 @@ func TestUniQueue_Get(t *testing.T) {
start := time.Now()
item := uq.Get(ctx)
if time.Now().Sub(start) < time.Second || item != nil {
- fail(t, "Get should be timed out, result: %v", item)
+ t.Fatalf("Get should be timed out, result: %v", item)
}
err := uq.Put(context.Background(), "abc")
if err != nil {
- fail(t, "Put('abc') should be ok")
+ t.Fatalf("Put('abc') should be ok")
}
err = uq.Put(context.Background(), "efg")
if err != nil {
- fail(t, "Put('efg') should be ok")
+ t.Fatalf("Put('efg') should be ok")
}
time.Sleep(time.Second)
@@ -86,7 +86,7 @@ func TestUniQueue_Get(t *testing.T) {
ctx, _ = context.WithTimeout(context.Background(), time.Second)
item = uq.Get(ctx)
if item == nil || item.(string) != "efg" {
- fail(t, "Get expect '%v' to 'efg'", item)
+ t.Fatalf("Get expect '%v' to 'efg'", item)
}
}
@@ -96,17 +96,17 @@ func TestUniQueue_Put(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
err = uq.Put(ctx, 1)
if err != nil {
- fail(t, "Put(1) should be ok")
+ t.Fatalf("Put(1) should be ok")
}
cancel()
err = uq.Put(ctx, 2)
if err == nil {
- fail(t, "Put(2) should return 'timed out' error ")
+ t.Fatalf("Put(2) should return 'timed out' error ")
}
uq.Close()
err = uq.Put(context.Background(), 3)
if err == nil {
- fail(t, "Put(3) should return 'channel is closed' error")
+ t.Fatalf("Put(3) should return 'channel is closed' error")
}
}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index 3a54c36..16f342f 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -17,16 +17,9 @@
package util
import (
- "fmt"
"testing"
)
-func fail(t *testing.T, format string, args ...interface{}) {
- fmt.Printf(format, args...)
- fmt.Println()
- t.FailNow()
-}
-
func TestBytesToInt32(t *testing.T) {
bs := []byte{0, 0, 0, 1}
i := BytesToInt32(bs)
diff --git a/server/broker/store.go b/server/broker/store.go
index 95c61a4..7ed9c02 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -70,7 +70,7 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption
sstore.WithKey(TypeRoots[t]),
sstore.WithInitSize(s.StoreSize(t)),
)
- s.newIndexer(t, sstore.NewKvCacher(opts...))
+ s.newIndexer(t, sstore.NewKvCacher(t.String(), opts...))
}
func (s *BKvStore) store(ctx context.Context) {
@@ -119,7 +119,7 @@ func (s *BKvStore) newNullStore(t sstore.StoreType) {
}
func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
- indexer := sstore.NewCacheIndexer(t, cacher)
+ indexer := sstore.NewCacheIndexer(cacher)
s.bindexers[t] = indexer
indexer.Run()
}
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cache_kv.go
similarity index 67%
copy from server/core/backend/store/cacher.go
copy to server/core/backend/store/cache_kv.go
index 39ba733..1b92e99 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cache_kv.go
@@ -26,69 +26,6 @@ import (
"time"
)
-const (
- DEFAULT_COMPACT_TIMES = 3
- DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
- event_block_size = 1000
-)
-
-var (
- NullCache = &nullCache{}
- NullCacher = &nullCacher{}
-)
-
-type Cache interface {
- Version() int64
- Data(interface{}) interface{}
- Have(interface{}) bool
- Size() int
-}
-
-type Cacher interface {
- Cache() Cache
- Run()
- Stop()
- Ready() <-chan struct{}
-}
-
-type nullCache struct {
-}
-
-func (n *nullCache) Version() int64 {
- return 0
-}
-
-func (n *nullCache) Data(interface{}) interface{} {
- return nil
-}
-
-func (n *nullCache) Have(interface{}) bool {
- return false
-}
-
-func (n *nullCache) Size() int {
- return 0
-}
-
-type nullCacher struct {
-}
-
-func (n *nullCacher) Cache() Cache {
- return NullCache
-}
-
-func (n *nullCacher) Run() {}
-
-func (n *nullCacher) Stop() {}
-
-func (n *nullCacher) Ready() <-chan struct{} {
- c := make(chan struct{})
- close(c)
- return c
-}
-
-type KvCacheSafeRFunc func()
-
type KvCache struct {
owner *KvCacher
size int
@@ -153,6 +90,7 @@ func (c *KvCache) Unlock() {
c.lastMaxSize = l
c.lastRefresh = time.Now()
}
+
c.rwMux.Unlock()
}
@@ -166,8 +104,9 @@ func (c *KvCache) Size() (l int) {
type KvCacher struct {
Cfg KvCacherCfg
- lastRev int64
- noEventInterval int
+ name string
+ lastRev int64
+ noEventCount int
ready chan struct{}
lw ListWatcher
@@ -177,74 +116,73 @@ type KvCacher struct {
goroutine *util.GoRoutine
}
+func (c *KvCacher) Name() string {
+ return c.name
+}
+
func (c *KvCacher) needList() bool {
rev := c.lw.Revision()
defer func() { c.lastRev = rev }()
if rev == 0 {
- c.noEventInterval = 0
+ c.noEventCount = 0
return true
}
if c.lastRev != rev {
- c.noEventInterval = 0
+ c.noEventCount = 0
return false
}
- c.noEventInterval++
- if c.noEventInterval < c.Cfg.NoEventMaxInterval {
+ c.noEventCount++
+ if c.noEventCount < c.Cfg.NoEventMaxInterval {
return false
}
- util.Logger().Debugf("no events come in more then %s, need to list key %s",
- time.Duration(c.noEventInterval)*c.Cfg.Timeout, c.Cfg.Key)
- c.noEventInterval = 0
+ util.Logger().Debugf("no events come in more then %s, need to list key %s, rev: %d",
+ time.Duration(c.noEventCount)*c.Cfg.Timeout, c.Cfg.Key, rev)
+ c.noEventCount = 0
return true
}
-func (c *KvCacher) doList(listOps *ListOptions) error {
+func (c *KvCacher) doList(listOps ListOptions) error {
kvs, err := c.lw.List(listOps)
if err != nil {
return err
}
start := time.Now()
- c.lastRev = c.lw.Revision()
- c.sync(c.filter(c.lastRev, kvs))
-
- util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lastRev)
+ evts := c.filter(c.lw.Revision(), kvs)
+ if ec, kc := len(evts), len(kvs); c.Cfg.DeferHandler != nil && ec == 0 && kc != 0 &&
+ c.Cfg.DeferHandler.Reset() {
+ util.Logger().Warnf(nil, "most of the protected data(%d/%d) are recovered", kc, c.cache.Size())
+ }
+ c.sync(evts)
+ util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lw.Revision())
return nil
}
-func (c *KvCacher) doWatch(listOps *ListOptions) error {
+func (c *KvCacher) doWatch(listOps ListOptions) error {
watcher := c.lw.Watch(listOps)
return c.handleWatcher(watcher)
}
func (c *KvCacher) ListAndWatch(ctx context.Context) error {
c.mux.Lock()
+ defer c.mux.Unlock()
- listOps := &ListOptions{
+ listOps := ListOptions{
Timeout: c.Cfg.Timeout,
Context: ctx,
}
if c.needList() {
- err := c.doList(listOps)
- if err != nil {
- util.Logger().Errorf(err, "list key %s failed, rev: %d", c.Cfg.Key, c.lastRev)
- // do not return err, continue to watch
+ if err := c.doList(listOps); err != nil {
+ return err
}
- util.SafeCloseChan(c.ready)
}
- err := c.doWatch(listOps)
-
- c.mux.Unlock()
+ util.SafeCloseChan(c.ready)
- if err != nil {
- util.Logger().Errorf(err, "handle watcher failed, watch key %s, start rev: %d+1", c.Cfg.Key, c.lastRev)
- return err
- }
- return nil
+ return c.doWatch(listOps)
}
func (c *KvCacher) handleWatcher(watcher *Watcher) error {
@@ -259,8 +197,8 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error {
return nil
}
-func (c *KvCacher) needDeferHandle(evts []*Event) bool {
- if c.Cfg.DeferHandler == nil {
+func (c *KvCacher) needDeferHandle(evts []KvEvent) bool {
+ if c.Cfg.DeferHandler == nil || !c.IsReady() {
return false
}
@@ -282,6 +220,9 @@ func (c *KvCacher) refresh(ctx context.Context) {
util.Logger().Debugf("stop to list and watch %s", c.Cfg)
return
case <-time.After(nextPeriod):
+ r := c.cache.RLock()
+ ReportCacheMetrics(c.Name(), "raw", r)
+ c.cache.RUnlock()
}
}
}
@@ -291,7 +232,10 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
return
}
- i, evts := 0, make([]*Event, event_block_size)
+ var (
+ evts [event_block_size]KvEvent
+ i int
+ )
for {
select {
case <-ctx.Done():
@@ -320,7 +264,7 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
}
}
-func (c *KvCacher) sync(evts []*Event) {
+func (c *KvCacher) sync(evts []KvEvent) {
if len(evts) == 0 {
return
}
@@ -332,7 +276,7 @@ func (c *KvCacher) sync(evts []*Event) {
c.onEvents(evts)
}
-func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
+func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []KvEvent {
store := c.cache.RLock()
defer c.cache.RUnlock()
@@ -351,26 +295,27 @@ func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
}
filterStopCh := make(chan struct{})
- eventsCh := make(chan [event_block_size]*Event, max/event_block_size+2)
+ eventsCh := make(chan [event_block_size]KvEvent, max/event_block_size+2)
go c.filterDelete(store, newStore, rev, eventsCh, filterStopCh)
go c.filterCreateOrUpdate(store, newStore, rev, eventsCh, filterStopCh)
- evts := make([]*Event, 0, max)
+ evts := make([]KvEvent, 0, max)
for block := range eventsCh {
- for _, evt := range block {
- if evt == nil {
+ for _, e := range block {
+ if e.Object == nil {
break
}
- evts = append(evts, evt)
+ evts = append(evts, e)
}
}
return evts
}
-func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue, rev int64, eventsCh chan [event_block_size]*Event, filterStopCh chan struct{}) {
- var block [event_block_size]*Event
+func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue,
+ rev int64, eventsCh chan [event_block_size]KvEvent, filterStopCh chan struct{}) {
+ var block [event_block_size]KvEvent
i := 0
for k, v := range store {
_, ok := newStore[k]
@@ -380,11 +325,11 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[
if i >= event_block_size {
eventsCh <- block
- block = [event_block_size]*Event{}
+ block = [event_block_size]KvEvent{}
i = 0
}
- block[i] = &Event{
+ block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_DELETE,
Prefix: c.Cfg.Key,
@@ -400,19 +345,20 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[
close(filterStopCh)
}
-func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue, rev int64, eventsCh chan [event_block_size]*Event, filterStopCh chan struct{}) {
- var block [event_block_size]*Event
+func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue,
+ rev int64, eventsCh chan [event_block_size]KvEvent, filterStopCh chan struct{}) {
+ var block [event_block_size]KvEvent
i := 0
for k, v := range newStore {
ov, ok := store[k]
if !ok {
if i >= event_block_size {
eventsCh <- block
- block = [event_block_size]*Event{}
+ block = [event_block_size]KvEvent{}
i = 0
}
- block[i] = &Event{
+ block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_CREATE,
Prefix: c.Cfg.Key,
@@ -422,17 +368,17 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
continue
}
- if ov.ModRevision >= v.ModRevision {
+ if ov.CreateRevision == v.CreateRevision && ov.ModRevision == v.ModRevision {
continue
}
if i >= event_block_size {
eventsCh <- block
- block = [event_block_size]*Event{}
+ block = [event_block_size]KvEvent{}
i = 0
}
- block[i] = &Event{
+ block[i] = KvEvent{
Revision: rev,
Type: proto.EVT_UPDATE,
Prefix: c.Cfg.Key,
@@ -445,90 +391,73 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
eventsCh <- block
}
- select {
- case <-filterStopCh:
- close(eventsCh)
- }
+ <-filterStopCh
+
+ close(eventsCh)
}
-func (c *KvCacher) onEvents(evts []*Event) {
- idx, init := 0, !c.IsReady()
- kvEvts := make([]*KvEvent, len(evts))
+func (c *KvCacher) onEvents(evts []KvEvent) {
+ init := !c.IsReady()
store := c.cache.Lock()
- for _, evt := range evts {
+ for i, evt := range evts {
kv := evt.Object.(*mvccpb.KeyValue)
key := util.BytesToStringWithNoCopy(kv.Key)
prevKv, ok := store[key]
switch evt.Type {
case proto.EVT_CREATE, proto.EVT_UPDATE:
- util.Logger().Debugf("sync %s event and notify watcher, cache %v", evt.Type, kv)
-
- t := evt.Type
- if !ok && evt.Type != proto.EVT_CREATE {
+ switch {
+ case init:
+ evt.Type = proto.EVT_INIT
+ case !ok && evt.Type != proto.EVT_CREATE:
util.Logger().Warnf(nil, "unexpected %s event! it should be %s key %s",
evt.Type, proto.EVT_CREATE, key)
- t = proto.EVT_CREATE
- }
-
- if ok && evt.Type != proto.EVT_UPDATE {
+ evt.Type = proto.EVT_CREATE
+ case ok && evt.Type != proto.EVT_UPDATE:
util.Logger().Warnf(nil, "unexpected %s event! it should be %s key %s",
evt.Type, proto.EVT_UPDATE, key)
- t = proto.EVT_UPDATE
+ evt.Type = proto.EVT_UPDATE
}
store[key] = kv
- kvEvts[idx] = &KvEvent{
- Revision: evt.Revision,
- Action: t,
- KV: kv,
- }
+ evts[i] = evt
case proto.EVT_DELETE:
if !ok {
- util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key)
- continue
+ util.Logger().Warnf(nil, "unexpected %s event! key %s does not cache", evt.Type, key)
+ } else {
+ delete(store, key)
}
-
- util.Logger().Debugf("sync %s event and notify watcher, remove cache %v", evt.Type, kv)
- delete(store, key)
- kvEvts[idx] = &KvEvent{
- Revision: evt.Revision,
- Action: evt.Type,
- KV: prevKv,
- }
- }
-
- if init && kvEvts[idx].Action == proto.EVT_CREATE {
- kvEvts[idx].Action = proto.EVT_INIT
+ evt.Object = prevKv // maybe nil
+ evts[i] = evt
}
-
- idx++
}
+
c.cache.Unlock()
- c.onKvEvents(kvEvts[:idx])
+ c.onKvEvents(evts)
}
-func (c *KvCacher) onKvEvents(evts []*KvEvent) {
+func (c *KvCacher) onKvEvents(evts []KvEvent) {
if c.Cfg.OnEvent == nil {
return
}
for _, evt := range evts {
+ if evt.Object == nil {
+ continue
+ }
c.Cfg.OnEvent(evt)
}
}
-func (c *KvCacher) run() {
- c.goroutine.Do(c.refresh)
- c.goroutine.Do(c.deferHandle)
-}
-
func (c *KvCacher) Cache() Cache {
return c.cache
}
func (c *KvCacher) Run() {
- c.once.Do(c.run)
+ c.once.Do(func() {
+ c.goroutine.Do(c.refresh)
+ c.goroutine.Do(c.deferHandle)
+ })
}
func (c *KvCacher) Stop() {
@@ -559,13 +488,14 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
}
}
-func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
+func NewKvCacher(name string, opts ...KvCacherCfgOption) *KvCacher {
cfg := DefaultKvCacherConfig()
for _, opt := range opts {
opt(&cfg)
}
cacher := &KvCacher{
+ name: name,
Cfg: cfg,
ready: make(chan struct{}),
lw: ListWatcher{
diff --git a/pkg/util/util_test.go b/server/core/backend/store/cache_null.go
similarity index 55%
copy from pkg/util/util_test.go
copy to server/core/backend/store/cache_null.go
index 3a54c36..2025bff 100644
--- a/pkg/util/util_test.go
+++ b/server/core/backend/store/cache_null.go
@@ -14,47 +14,52 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package util
+package store
-import (
- "fmt"
- "testing"
+var (
+ NullCache = &nullCache{}
+ NullCacher = &nullCacher{}
+ closedCh = make(chan struct{})
)
-func fail(t *testing.T, format string, args ...interface{}) {
- fmt.Printf(format, args...)
- fmt.Println()
- t.FailNow()
-}
-
-func TestBytesToInt32(t *testing.T) {
- bs := []byte{0, 0, 0, 1}
- i := BytesToInt32(bs)
- if i != 1 {
- t.FailNow()
- }
-
- bs = []byte{1, 0, 0, 0}
- i = BytesToInt32(bs)
- if i != 1<<(3*8) {
- t.FailNow()
- }
-
- bs = []byte{0, 0, 0, 0, 1}
- i = BytesToInt32(bs)
- if i != 0 {
- t.FailNow()
- }
-
- bs = []byte{1}
- i = BytesToInt32(bs)
- if i != 1 {
- t.FailNow()
- }
-
- bs = []byte{1, 0}
- i = BytesToInt32(bs)
- if i != 1<<8 {
- t.FailNow()
- }
+func init() {
+ close(closedCh)
+}
+
+type nullCache struct {
+}
+
+func (n *nullCache) Version() int64 {
+ return 0
+}
+
+func (n *nullCache) Data(interface{}) interface{} {
+ return nil
+}
+
+func (n *nullCache) Have(interface{}) bool {
+ return false
+}
+
+func (n *nullCache) Size() int {
+ return 0
+}
+
+type nullCacher struct {
+}
+
+func (n *nullCacher) Name() string {
+ return ""
+}
+
+func (n *nullCacher) Cache() Cache {
+ return NullCache
+}
+
+func (n *nullCacher) Run() {}
+
+func (n *nullCacher) Stop() {}
+
+func (n *nullCacher) Ready() <-chan struct{} {
+ return closedCh
}
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index 39ba733..94fbe46 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -16,27 +16,6 @@
*/
package store
-import (
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
- "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
- "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "golang.org/x/net/context"
- "sync"
- "time"
-)
-
-const (
- DEFAULT_COMPACT_TIMES = 3
- DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
- event_block_size = 1000
-)
-
-var (
- NullCache = &nullCache{}
- NullCacher = &nullCacher{}
-)
-
type Cache interface {
Version() int64
Data(interface{}) interface{}
@@ -45,535 +24,9 @@ type Cache interface {
}
type Cacher interface {
+ Name() string
Cache() Cache
Run()
Stop()
Ready() <-chan struct{}
}
-
-type nullCache struct {
-}
-
-func (n *nullCache) Version() int64 {
- return 0
-}
-
-func (n *nullCache) Data(interface{}) interface{} {
- return nil
-}
-
-func (n *nullCache) Have(interface{}) bool {
- return false
-}
-
-func (n *nullCache) Size() int {
- return 0
-}
-
-type nullCacher struct {
-}
-
-func (n *nullCacher) Cache() Cache {
- return NullCache
-}
-
-func (n *nullCacher) Run() {}
-
-func (n *nullCacher) Stop() {}
-
-func (n *nullCacher) Ready() <-chan struct{} {
- c := make(chan struct{})
- close(c)
- return c
-}
-
-type KvCacheSafeRFunc func()
-
-type KvCache struct {
- owner *KvCacher
- size int
- store map[string]*mvccpb.KeyValue
- rwMux sync.RWMutex
- lastRefresh time.Time
- lastMaxSize int
-}
-
-func (c *KvCache) Version() int64 {
- return c.owner.lw.Revision()
-}
-
-func (c *KvCache) Data(k interface{}) interface{} {
- c.rwMux.RLock()
- kv, ok := c.store[k.(string)]
- c.rwMux.RUnlock()
- if !ok {
- return nil
- }
- copied := *kv
- return &copied
-}
-
-func (c *KvCache) Have(k interface{}) (ok bool) {
- c.rwMux.RLock()
- _, ok = c.store[k.(string)]
- c.rwMux.RUnlock()
- return
-}
-
-func (c *KvCache) RLock() map[string]*mvccpb.KeyValue {
- c.rwMux.RLock()
- return c.store
-}
-
-func (c *KvCache) RUnlock() {
- c.rwMux.RUnlock()
-}
-
-func (c *KvCache) Lock() map[string]*mvccpb.KeyValue {
- c.rwMux.Lock()
- return c.store
-}
-
-func (c *KvCache) Unlock() {
- l := len(c.store)
- if l > c.lastMaxSize {
- c.lastMaxSize = l
- }
- if c.size >= l &&
- c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES &&
- time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT {
- util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d",
- c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
- // gc
- newCache := make(map[string]*mvccpb.KeyValue, c.size)
- for k, v := range c.store {
- newCache[k] = v
- }
- c.store = newCache
- c.lastMaxSize = l
- c.lastRefresh = time.Now()
- }
- c.rwMux.Unlock()
-}
-
-func (c *KvCache) Size() (l int) {
- c.rwMux.RLock()
- l = len(c.store)
- c.rwMux.RUnlock()
- return
-}
-
-type KvCacher struct {
- Cfg KvCacherCfg
-
- lastRev int64
- noEventInterval int
-
- ready chan struct{}
- lw ListWatcher
- mux sync.Mutex
- once sync.Once
- cache *KvCache
- goroutine *util.GoRoutine
-}
-
-func (c *KvCacher) needList() bool {
- rev := c.lw.Revision()
- defer func() { c.lastRev = rev }()
-
- if rev == 0 {
- c.noEventInterval = 0
- return true
- }
- if c.lastRev != rev {
- c.noEventInterval = 0
- return false
- }
- c.noEventInterval++
- if c.noEventInterval < c.Cfg.NoEventMaxInterval {
- return false
- }
-
- util.Logger().Debugf("no events come in more then %s, need to list key %s",
- time.Duration(c.noEventInterval)*c.Cfg.Timeout, c.Cfg.Key)
- c.noEventInterval = 0
- return true
-}
-
-func (c *KvCacher) doList(listOps *ListOptions) error {
- kvs, err := c.lw.List(listOps)
- if err != nil {
- return err
- }
-
- start := time.Now()
- c.lastRev = c.lw.Revision()
- c.sync(c.filter(c.lastRev, kvs))
-
- util.LogDebugOrWarnf(start, "finish to cache key %s, %d items, rev: %d", c.Cfg.Key, len(kvs), c.lastRev)
-
- return nil
-}
-
-func (c *KvCacher) doWatch(listOps *ListOptions) error {
- watcher := c.lw.Watch(listOps)
- return c.handleWatcher(watcher)
-}
-
-func (c *KvCacher) ListAndWatch(ctx context.Context) error {
- c.mux.Lock()
-
- listOps := &ListOptions{
- Timeout: c.Cfg.Timeout,
- Context: ctx,
- }
- if c.needList() {
- err := c.doList(listOps)
- if err != nil {
- util.Logger().Errorf(err, "list key %s failed, rev: %d", c.Cfg.Key, c.lastRev)
- // do not return err, continue to watch
- }
- util.SafeCloseChan(c.ready)
- }
-
- err := c.doWatch(listOps)
-
- c.mux.Unlock()
-
- if err != nil {
- util.Logger().Errorf(err, "handle watcher failed, watch key %s, start rev: %d+1", c.Cfg.Key, c.lastRev)
- return err
- }
- return nil
-}
-
-func (c *KvCacher) handleWatcher(watcher *Watcher) error {
- defer watcher.Stop()
- for evts := range watcher.EventBus() {
- if evts[0].Type == proto.EVT_ERROR {
- err := evts[0].Object.(error)
- return err
- }
- c.sync(evts)
- }
- return nil
-}
-
-func (c *KvCacher) needDeferHandle(evts []*Event) bool {
- if c.Cfg.DeferHandler == nil {
- return false
- }
-
- return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
-}
-
-func (c *KvCacher) refresh(ctx context.Context) {
- util.Logger().Debugf("start to list and watch %s", c.Cfg)
- for {
- start := time.Now()
- c.ListAndWatch(ctx)
- watchDuration := time.Since(start)
- nextPeriod := c.Cfg.Period
- if watchDuration > 0 && c.Cfg.Period > watchDuration {
- nextPeriod = c.Cfg.Period - watchDuration
- }
- select {
- case <-ctx.Done():
- util.Logger().Debugf("stop to list and watch %s", c.Cfg)
- return
- case <-time.After(nextPeriod):
- }
- }
-}
-
-func (c *KvCacher) deferHandle(ctx context.Context) {
- if c.Cfg.DeferHandler == nil {
- return
- }
-
- i, evts := 0, make([]*Event, event_block_size)
- for {
- select {
- case <-ctx.Done():
- return
- case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
- if !ok {
- <-time.After(time.Second)
- continue
- }
-
- if i >= event_block_size {
- c.onEvents(evts[:i])
- i = 0
- }
-
- evts[i] = evt
- i++
- case <-time.After(300 * time.Millisecond):
- if i == 0 {
- continue
- }
-
- c.onEvents(evts[:i])
- i = 0
- }
- }
-}
-
-func (c *KvCacher) sync(evts []*Event) {
- if len(evts) == 0 {
- return
- }
-
- if c.needDeferHandle(evts) {
- return
- }
-
- c.onEvents(evts)
-}
-
-func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
- store := c.cache.RLock()
- defer c.cache.RUnlock()
-
- oc, nc := len(store), len(items)
- tc := oc + nc
- if tc == 0 {
- return nil
- }
- max := oc
- if nc > oc {
- max = nc
- }
-
- newStore := make(map[string]*mvccpb.KeyValue, nc)
- for _, kv := range items {
- newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
- }
- filterStopCh := make(chan struct{})
- eventsCh := make(chan [event_block_size]*Event, max/event_block_size+2)
-
- go c.filterDelete(store, newStore, rev, eventsCh, filterStopCh)
-
- go c.filterCreateOrUpdate(store, newStore, rev, eventsCh, filterStopCh)
-
- evts := make([]*Event, 0, max)
- for block := range eventsCh {
- for _, evt := range block {
- if evt == nil {
- break
- }
- evts = append(evts, evt)
- }
- }
- return evts
-}
-
-func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue, rev int64, eventsCh chan [event_block_size]*Event, filterStopCh chan struct{}) {
- var block [event_block_size]*Event
- i := 0
- for k, v := range store {
- _, ok := newStore[k]
- if ok {
- continue
- }
-
- if i >= event_block_size {
- eventsCh <- block
- block = [event_block_size]*Event{}
- i = 0
- }
-
- block[i] = &Event{
- Revision: rev,
- Type: proto.EVT_DELETE,
- Prefix: c.Cfg.Key,
- Object: v,
- }
- i++
- }
-
- if i > 0 {
- eventsCh <- block
- }
-
- close(filterStopCh)
-}
-
-func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newStore map[string]*mvccpb.KeyValue, rev int64, eventsCh chan [event_block_size]*Event, filterStopCh chan struct{}) {
- var block [event_block_size]*Event
- i := 0
- for k, v := range newStore {
- ov, ok := store[k]
- if !ok {
- if i >= event_block_size {
- eventsCh <- block
- block = [event_block_size]*Event{}
- i = 0
- }
-
- block[i] = &Event{
- Revision: rev,
- Type: proto.EVT_CREATE,
- Prefix: c.Cfg.Key,
- Object: v,
- }
- i++
- continue
- }
-
- if ov.ModRevision >= v.ModRevision {
- continue
- }
-
- if i >= event_block_size {
- eventsCh <- block
- block = [event_block_size]*Event{}
- i = 0
- }
-
- block[i] = &Event{
- Revision: rev,
- Type: proto.EVT_UPDATE,
- Prefix: c.Cfg.Key,
- Object: v,
- }
- i++
- }
-
- if i > 0 {
- eventsCh <- block
- }
-
- select {
- case <-filterStopCh:
- close(eventsCh)
- }
-}
-
-func (c *KvCacher) onEvents(evts []*Event) {
- idx, init := 0, !c.IsReady()
- kvEvts := make([]*KvEvent, len(evts))
- store := c.cache.Lock()
- for _, evt := range evts {
- kv := evt.Object.(*mvccpb.KeyValue)
- key := util.BytesToStringWithNoCopy(kv.Key)
- prevKv, ok := store[key]
-
- switch evt.Type {
- case proto.EVT_CREATE, proto.EVT_UPDATE:
- util.Logger().Debugf("sync %s event and notify watcher, cache %v", evt.Type, kv)
-
- t := evt.Type
- if !ok && evt.Type != proto.EVT_CREATE {
- util.Logger().Warnf(nil, "unexpected %s event! it should be %s key %s",
- evt.Type, proto.EVT_CREATE, key)
- t = proto.EVT_CREATE
- }
-
- if ok && evt.Type != proto.EVT_UPDATE {
- util.Logger().Warnf(nil, "unexpected %s event! it should be %s key %s",
- evt.Type, proto.EVT_UPDATE, key)
- t = proto.EVT_UPDATE
- }
-
- store[key] = kv
- kvEvts[idx] = &KvEvent{
- Revision: evt.Revision,
- Action: t,
- KV: kv,
- }
- case proto.EVT_DELETE:
- if !ok {
- util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key)
- continue
- }
-
- util.Logger().Debugf("sync %s event and notify watcher, remove cache %v", evt.Type, kv)
- delete(store, key)
- kvEvts[idx] = &KvEvent{
- Revision: evt.Revision,
- Action: evt.Type,
- KV: prevKv,
- }
- }
-
- if init && kvEvts[idx].Action == proto.EVT_CREATE {
- kvEvts[idx].Action = proto.EVT_INIT
- }
-
- idx++
- }
- c.cache.Unlock()
-
- c.onKvEvents(kvEvts[:idx])
-}
-
-func (c *KvCacher) onKvEvents(evts []*KvEvent) {
- if c.Cfg.OnEvent == nil {
- return
- }
- for _, evt := range evts {
- c.Cfg.OnEvent(evt)
- }
-}
-
-func (c *KvCacher) run() {
- c.goroutine.Do(c.refresh)
- c.goroutine.Do(c.deferHandle)
-}
-
-func (c *KvCacher) Cache() Cache {
- return c.cache
-}
-
-func (c *KvCacher) Run() {
- c.once.Do(c.run)
-}
-
-func (c *KvCacher) Stop() {
- c.goroutine.Close(true)
-
- util.SafeCloseChan(c.ready)
-}
-
-func (c *KvCacher) Ready() <-chan struct{} {
- return c.ready
-}
-
-func (c *KvCacher) IsReady() bool {
- select {
- case <-c.ready:
- return true
- default:
- return false
- }
-}
-
-func NewKvCache(c *KvCacher, size int) *KvCache {
- return &KvCache{
- owner: c,
- size: size,
- store: make(map[string]*mvccpb.KeyValue, size),
- lastRefresh: time.Now(),
- }
-}
-
-func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
- cfg := DefaultKvCacherConfig()
- for _, opt := range opts {
- opt(&cfg)
- }
-
- cacher := &KvCacher{
- Cfg: cfg,
- ready: make(chan struct{}),
- lw: ListWatcher{
- Client: backend.Registry(),
- Key: cfg.Key,
- },
- goroutine: util.NewGo(context.Background()),
- }
- cacher.cache = NewKvCache(cacher, cfg.InitSize)
- return cacher
-}
diff --git a/server/core/backend/store/cacher_test.go b/server/core/backend/store/cacher_test.go
new file mode 100644
index 0000000..3225757
--- /dev/null
+++ b/server/core/backend/store/cacher_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "github.com/coreos/etcd/mvcc/mvccpb"
+ "math/rand"
+ "testing"
+)
+
+func BenchmarkFilter(b *testing.B) {
+ inst := &pb.MicroServiceInstance{
+ HealthCheck: &pb.HealthCheck{
+ Interval: 4,
+ Times: 0,
+ },
+ }
+ v, _ := json.Marshal(inst)
+
+ cacher := &KvCacher{}
+
+ n := 300 * 1000 // 30w
+ cache := NewKvCache(cacher, n)
+ items := make([]*mvccpb.KeyValue, 0, n)
+ for ; n > 0; n-- {
+ k := fmt.Sprintf("/%d", n)
+ if n <= 10*1000 {
+ // create
+ items = append(items, &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy(k),
+ Value: v,
+ ModRevision: int64(rand.Int()),
+ })
+ } else if n > 100*1000 && n <= 20*1000 {
+ // update
+ cache.store[k] = &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy(k),
+ Value: v,
+ ModRevision: 1,
+ }
+ items = append(items, &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy(k),
+ Value: v,
+ ModRevision: int64(rand.Int()),
+ })
+ } else {
+ // delete
+ cache.store[k] = &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy(k),
+ Value: v,
+ ModRevision: 1,
+ }
+ }
+ }
+ cacher.cache = cache
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ cacher.filter(1, items)
+ }
+ b.ReportAllocs()
+
+ // TODO bad performance!!!
+ //10 120612060 ns/op 37128035 B/op 134 allocs/op
+ //
+}
diff --git a/server/core/backend/store/common.go b/server/core/backend/store/common.go
new file mode 100644
index 0000000..16ba78b
--- /dev/null
+++ b/server/core/backend/store/common.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+ apt "github.com/apache/incubator-servicecomb-service-center/server/core"
+ "strconv"
+ "time"
+)
+
+type StoreType int
+
+func (st StoreType) String() string {
+ if int(st) < len(TypeNames) {
+ return TypeNames[st]
+ }
+ return "TYPE" + strconv.Itoa(int(st))
+}
+
+const (
+ DOMAIN StoreType = iota
+ PROJECT
+ SERVICE
+ SERVICE_INDEX
+ SERVICE_ALIAS
+ SERVICE_TAG
+ RULE
+ RULE_INDEX
+ DEPENDENCY
+ DEPENDENCY_RULE
+ DEPENDENCY_QUEUE
+ SCHEMA // big data should not be stored in memory.
+ SCHEMA_SUMMARY
+ INSTANCE
+ LEASE
+ ENDPOINTS
+ typeEnd
+)
+
+var TypeNames = []string{
+ SERVICE: "SERVICE",
+ INSTANCE: "INSTANCE",
+ DOMAIN: "DOMAIN",
+ SCHEMA: "SCHEMA",
+ SCHEMA_SUMMARY: "SCHEMA_SUMMARY",
+ RULE: "RULE",
+ LEASE: "LEASE",
+ SERVICE_INDEX: "SERVICE_INDEX",
+ SERVICE_ALIAS: "SERVICE_ALIAS",
+ SERVICE_TAG: "SERVICE_TAG",
+ RULE_INDEX: "RULE_INDEX",
+ DEPENDENCY: "DEPENDENCY",
+ DEPENDENCY_RULE: "DEPENDENCY_RULE",
+ DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE",
+ PROJECT: "PROJECT",
+ ENDPOINTS: "ENDPOINTS",
+}
+
+var TypeRoots = map[StoreType]string{
+ SERVICE: apt.GetServiceRootKey(""),
+ INSTANCE: apt.GetInstanceRootKey(""),
+ DOMAIN: apt.GetDomainRootKey() + "/",
+ SCHEMA: apt.GetServiceSchemaRootKey(""),
+ SCHEMA_SUMMARY: apt.GetServiceSchemaSummaryRootKey(""),
+ RULE: apt.GetServiceRuleRootKey(""),
+ LEASE: apt.GetInstanceLeaseRootKey(""),
+ SERVICE_INDEX: apt.GetServiceIndexRootKey(""),
+ SERVICE_ALIAS: apt.GetServiceAliasRootKey(""),
+ SERVICE_TAG: apt.GetServiceTagRootKey(""),
+ RULE_INDEX: apt.GetServiceRuleIndexRootKey(""),
+ DEPENDENCY: apt.GetServiceDependencyRootKey(""),
+ DEPENDENCY_RULE: apt.GetServiceDependencyRuleRootKey(""),
+ DEPENDENCY_QUEUE: apt.GetServiceDependencyQueueRootKey(""),
+ PROJECT: apt.GetProjectRootKey(""),
+ ENDPOINTS: apt.GetEndpointsRootKey(""),
+}
+
+var TypeInitSize = map[StoreType]int{
+ SERVICE: 500,
+ INSTANCE: 1000,
+ DOMAIN: 100,
+ SCHEMA: 0,
+ SCHEMA_SUMMARY: 100,
+ RULE: 100,
+ LEASE: 1000,
+ SERVICE_INDEX: 500,
+ SERVICE_ALIAS: 100,
+ SERVICE_TAG: 100,
+ RULE_INDEX: 100,
+ DEPENDENCY: 100,
+ DEPENDENCY_RULE: 100,
+ DEPENDENCY_QUEUE: 100,
+ PROJECT: 100,
+ ENDPOINTS: 1000,
+}
+
+const (
+ DEFAULT_MAX_NO_EVENT_INTERVAL = 1 // TODO it should be set to 1 for prevent etcd data is lost accidentally.
+ DEFAULT_LISTWATCH_TIMEOUT = 30 * time.Second
+ DEFAULT_SELF_PRESERVATION_PERCENT = 0.8
+ DEFAULT_CACHE_INIT_SIZE = 100
+)
+
+const (
+ DEFAULT_COMPACT_TIMES = 3
+ DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute
+ event_block_size = 1000
+)
+
+const (
+ DEFAULT_MAX_EVENT_COUNT = 1000
+ DEFAULT_ADD_QUEUE_TIMEOUT = 5 * time.Second
+)
+
+const DEFAULT_CHECK_WINDOW = 2 * time.Second // instance DELETE event will be delay.
+
+const TIME_FORMAT = "15:04:05.000"
+
+const EVENT_BUS_MAX_SIZE = 1000
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 43464f0..da75b21 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -16,142 +16,8 @@
*/
package store
-import (
- "encoding/json"
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
- pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "golang.org/x/net/context"
- "sync"
- "time"
-)
-
type DeferHandler interface {
- OnCondition(Cache, []*Event) bool
- HandleChan() <-chan *Event
-}
-
-type deferItem struct {
- ttl *time.Timer
- event *Event
-}
-
-type InstanceEventDeferHandler struct {
- Percent float64
-
- cache Cache
- once sync.Once
- enabled bool
- items map[string]*deferItem
- pendingCh chan []*Event
- deferCh chan *Event
-}
-
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
- if iedh.Percent <= 0 {
- return false
- }
-
- iedh.once.Do(func() {
- iedh.cache = cache
- iedh.items = make(map[string]*deferItem, event_block_size)
- iedh.pendingCh = make(chan []*Event, event_block_size)
- iedh.deferCh = make(chan *Event, event_block_size)
- util.Go(iedh.check)
- })
-
- iedh.pendingCh <- evts
- return true
-}
-
-func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
- kv := evt.Object.(*mvccpb.KeyValue)
- key := util.BytesToStringWithNoCopy(kv.Key)
- _, ok := iedh.items[key]
- switch evt.Type {
- case pb.EVT_CREATE, pb.EVT_UPDATE:
- if ok {
- util.Logger().Infof("recovered key %s events", key)
- // return nil // no need to publish event to subscribers?
- }
- iedh.recover(evt)
- case pb.EVT_DELETE:
- if ok {
- return nil
- }
-
- var instance pb.MicroServiceInstance
- err := json.Unmarshal(kv.Value, &instance)
- if err != nil {
- util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
- return err
- }
- iedh.items[key] = &deferItem{
- ttl: time.NewTimer(
- time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second),
- event: evt,
- }
- }
- return nil
-}
-
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
- return iedh.deferCh
-}
-
-func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
- defer util.RecoverAndReport()
- t, n := iedh.newTimer(), false
- for {
- select {
- case <-ctx.Done():
- return
- case evts := <-iedh.pendingCh:
- for _, evt := range evts {
- iedh.recoverOrDefer(evt)
- }
-
- del := len(iedh.items)
- if del > 0 && !n {
- t.Stop()
- t, n = iedh.newTimer(), true
- }
-
- total := iedh.cache.Size()
- if !iedh.enabled && del > 0 && total > 5 && float64(del) >= float64(total)*iedh.Percent {
- iedh.enabled = true
- util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
- del, total, iedh.Percent*100)
- }
- case <-t.C:
- t, n = iedh.newTimer(), false
-
- for key, item := range iedh.items {
- if iedh.enabled {
- select {
- case <-item.ttl.C:
- default:
- continue
- }
- util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
- }
- iedh.recover(item.event)
- }
-
- if iedh.enabled && len(iedh.items) == 0 {
- iedh.enabled = false
- util.Logger().Warnf(nil, "self preservation is stopped")
- }
- }
- }
-}
-
-func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
- return time.NewTimer(2 * time.Second) // instance DELETE event will be delay.
-}
-
-func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
- key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
- delete(iedh.items, key)
- iedh.deferCh <- evt
+ OnCondition(Cache, []KvEvent) bool
+ HandleChan() <-chan KvEvent
+ Reset() bool
}
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer_instance.go
similarity index 73%
copy from server/core/backend/store/defer.go
copy to server/core/backend/store/defer_instance.go
index 43464f0..519b773 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer_instance.go
@@ -26,14 +26,11 @@ import (
"time"
)
-type DeferHandler interface {
- OnCondition(Cache, []*Event) bool
- HandleChan() <-chan *Event
-}
+var defInstanceEventDeferHandler = &InstanceEventDeferHandler{Percent: DEFAULT_SELF_PRESERVATION_PERCENT}
type deferItem struct {
ttl *time.Timer
- event *Event
+ event KvEvent
}
type InstanceEventDeferHandler struct {
@@ -42,21 +39,23 @@ type InstanceEventDeferHandler struct {
cache Cache
once sync.Once
enabled bool
- items map[string]*deferItem
- pendingCh chan []*Event
- deferCh chan *Event
+ items map[string]deferItem
+ pendingCh chan []KvEvent
+ deferCh chan KvEvent
+ resetCh chan struct{}
}
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
+func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []KvEvent) bool {
if iedh.Percent <= 0 {
return false
}
iedh.once.Do(func() {
iedh.cache = cache
- iedh.items = make(map[string]*deferItem, event_block_size)
- iedh.pendingCh = make(chan []*Event, event_block_size)
- iedh.deferCh = make(chan *Event, event_block_size)
+ iedh.items = make(map[string]deferItem, event_block_size)
+ iedh.pendingCh = make(chan []KvEvent, event_block_size)
+ iedh.deferCh = make(chan KvEvent, event_block_size)
+ iedh.resetCh = make(chan struct{})
util.Go(iedh.check)
})
@@ -64,7 +63,7 @@ func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) b
return true
}
-func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
+func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt KvEvent) error {
kv := evt.Object.(*mvccpb.KeyValue)
key := util.BytesToStringWithNoCopy(kv.Key)
_, ok := iedh.items[key]
@@ -86,7 +85,7 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
return err
}
- iedh.items[key] = &deferItem{
+ iedh.items[key] = deferItem{
ttl: time.NewTimer(
time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second),
event: evt,
@@ -95,13 +94,13 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
return nil
}
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
+func (iedh *InstanceEventDeferHandler) HandleChan() <-chan KvEvent {
return iedh.deferCh
}
func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
defer util.RecoverAndReport()
- t, n := iedh.newTimer(), false
+ t, n := time.NewTimer(DEFAULT_CHECK_WINDOW), false
for {
select {
case <-ctx.Done():
@@ -113,8 +112,11 @@ func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
del := len(iedh.items)
if del > 0 && !n {
- t.Stop()
- t, n = iedh.newTimer(), true
+ if !t.Stop() {
+ <-t.C
+ }
+ t.Reset(DEFAULT_CHECK_WINDOW)
+ n = true
}
total := iedh.cache.Size()
@@ -124,7 +126,8 @@ func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
del, total, iedh.Percent*100)
}
case <-t.C:
- t, n = iedh.newTimer(), false
+ t.Reset(DEFAULT_CHECK_WINDOW)
+ n = false
for key, item := range iedh.items {
if iedh.enabled {
@@ -142,16 +145,24 @@ func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
iedh.enabled = false
util.Logger().Warnf(nil, "self preservation is stopped")
}
+ case <-iedh.resetCh:
+ iedh.enabled = false
+ iedh.items = make(map[string]deferItem, event_block_size)
+ util.Logger().Warnf(nil, "self preservation is reset")
}
}
}
-func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
- return time.NewTimer(2 * time.Second) // instance DELETE event will be delay.
-}
-
-func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
+func (iedh *InstanceEventDeferHandler) recover(evt KvEvent) {
key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
delete(iedh.items, key)
iedh.deferCh <- evt
}
+
+func (iedh *InstanceEventDeferHandler) Reset() bool {
+ if iedh.enabled {
+ iedh.resetCh <- struct{}{}
+ return true
+ }
+ return false
+}
diff --git a/server/core/backend/store/defer_test.go b/server/core/backend/store/defer_test.go
index 2c9c453..4a034a6 100644
--- a/server/core/backend/store/defer_test.go
+++ b/server/core/backend/store/defer_test.go
@@ -32,14 +32,12 @@ func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
}
if iedh.OnCondition(nil, nil) {
- fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`)
- t.FailNow()
+ t.Fatalf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`)
}
iedh.Percent = 0.01
if !iedh.OnCondition(nil, nil) {
- fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`)
- t.FailNow()
+ t.Fatalf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`)
}
}
@@ -63,13 +61,28 @@ func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
Key: util.StringToBytesWithNoCopy("/3"),
Value: b,
}
+ kv4 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/4"),
+ Value: b,
+ }
+ kv5 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/5"),
+ Value: b,
+ }
+ kv6 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/6"),
+ Value: b,
+ }
cache := NewKvCache(nil, 1)
cache.store["/1"] = kv1
cache.store["/2"] = kv2
cache.store["/3"] = kv3
+ cache.store["/4"] = kv4
+ cache.store["/5"] = kv5
+ cache.store["/6"] = kv6
- evts1 := []*Event{
+ evts1 := []KvEvent{
{
Type: pb.EVT_CREATE,
Object: kv1,
@@ -79,7 +92,7 @@ func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
Object: kv1,
},
}
- evts2 := []*Event{
+ evts2 := []KvEvent{
{
Type: pb.EVT_DELETE,
Object: kv2,
@@ -88,12 +101,36 @@ func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
Type: pb.EVT_DELETE,
Object: kv3,
},
+ {
+ Type: pb.EVT_DELETE,
+ Object: kv4,
+ },
+ {
+ Type: pb.EVT_DELETE,
+ Object: kv5,
+ },
+ {
+ Type: pb.EVT_DELETE,
+ Object: kv6,
+ },
}
- evts3 := []*Event{
+ evts3 := []KvEvent{
{
Type: pb.EVT_CREATE,
Object: kv2,
},
+ {
+ Type: pb.EVT_UPDATE,
+ Object: kv4,
+ },
+ {
+ Type: pb.EVT_UPDATE,
+ Object: kv5,
+ },
+ {
+ Type: pb.EVT_CREATE,
+ Object: kv6,
+ },
}
iedh := &InstanceEventDeferHandler{
@@ -117,23 +154,21 @@ func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
fmt.Println(time.Now())
c := time.After(3 * time.Second)
- var evt3 *Event
+ var evt3 *KvEvent
for {
select {
case evt := <-iedh.HandleChan():
fmt.Println(time.Now(), evt)
if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" {
- evt3 = evt
+ evt3 = &evt
if iedh.Percent == 0.01 && evt.Type == pb.EVT_DELETE {
- fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
- t.FailNow()
+ t.Fatalf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
}
}
continue
case <-c:
if iedh.Percent == 0.8 && evt3 == nil {
- fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
- t.FailNow()
+ t.Fatalf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
}
}
break
diff --git a/server/core/backend/store/event.go b/server/core/backend/store/event.go
index 9991f3d..504aba1 100644
--- a/server/core/backend/store/event.go
+++ b/server/core/backend/store/event.go
@@ -18,7 +18,6 @@ package store
import (
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
- "github.com/coreos/etcd/mvcc/mvccpb"
"sync"
)
@@ -27,7 +26,7 @@ var (
)
func init() {
- evtProxies = make(map[StoreType]*KvEventProxy)
+ evtProxies = make(map[StoreType]*KvEventProxy, typeEnd)
for i := StoreType(0); i != typeEnd; i++ {
evtProxies[i] = &KvEventProxy{
evtHandleFuncs: make([]KvEventFunc, 0, 5),
@@ -35,17 +34,18 @@ func init() {
}
}
-type KvEventFunc func(evt *KvEvent)
+type KvEventFunc func(evt KvEvent)
type KvEvent struct {
Revision int64
- Action proto.EventType
- KV *mvccpb.KeyValue
+ Type proto.EventType
+ Prefix string
+ Object interface{}
}
type KvEventHandler interface {
Type() StoreType
- OnEvent(evt *KvEvent)
+ OnEvent(evt KvEvent)
}
type KvEventProxy struct {
@@ -59,7 +59,7 @@ func (h *KvEventProxy) AddHandleFunc(f KvEventFunc) {
h.lock.Unlock()
}
-func (h *KvEventProxy) OnEvent(evt *KvEvent) {
+func (h *KvEventProxy) OnEvent(evt KvEvent) {
h.lock.RLock()
for _, f := range h.evtHandleFuncs {
f(evt)
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 5cf3e6d..63c367b 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -28,11 +28,6 @@ import (
"time"
)
-const (
- DEFAULT_MAX_EVENT_COUNT = 1000
- DEFAULT_ADD_QUEUE_TIMEOUT = 5 * time.Second
-)
-
var defaultRootKeys map[string]struct{}
func init() {
@@ -45,10 +40,9 @@ func init() {
type Indexer struct {
BuildTimeout time.Duration
cacher Cacher
- cacheType StoreType
prefixIndex map[string]map[string]struct{}
prefixLock sync.RWMutex
- prefixBuildQueue chan *KvEvent
+ prefixBuildQueue chan KvEvent
goroutine *util.GoRoutine
ready chan struct{}
isClose bool
@@ -63,7 +57,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
op.Revision > 0 ||
(op.Offset >= 0 && op.Limit > 0) {
util.Logger().Debugf("search %s match special options, request etcd server, opts: %s",
- i.cacheType, op)
+ i.cacher.Name(), op)
return backend.Registry().Do(ctx, opts...)
}
@@ -78,7 +72,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
}
util.Logger().Debugf("can not find any key from %s cache with prefix, request etcd server, key: %s",
- i.cacheType, key)
+ i.cacher.Name(), key)
return backend.Registry().Do(ctx, opts...)
}
@@ -98,7 +92,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
return resp, nil
}
- util.Logger().Debugf("%s cache does not store this key, request etcd server, key: %s", i.cacheType, key)
+ util.Logger().Debugf("%s cache does not store this key, request etcd server, key: %s", i.cacher.Name(), key)
return backend.Registry().Do(ctx, opts...)
}
@@ -109,7 +103,7 @@ func (i *Indexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (
}
util.Logger().Debugf("do not match any key in %s cache store, request etcd server, key: %s",
- i.cacheType, key)
+ i.cacher.Name(), key)
return backend.Registry().Do(ctx, opts...)
}
@@ -163,8 +157,8 @@ func (i *Indexer) searchPrefixKeyWithCache(ctx context.Context, op registry.Plug
return resp, nil
}
-func (i *Indexer) OnCacheEvent(evt *KvEvent) {
- switch evt.Action {
+func (i *Indexer) OnCacheEvent(evt KvEvent) {
+ switch evt.Type {
case pb.EVT_INIT, pb.EVT_CREATE, pb.EVT_DELETE:
default:
return
@@ -178,9 +172,9 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) {
ctx, _ := context.WithTimeout(context.Background(), i.BuildTimeout)
select {
case <-ctx.Done():
- key := util.BytesToStringWithNoCopy(evt.KV.Key)
+ key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
util.Logger().Warnf(nil, "add event to build index queue timed out(%s), key is %s [%s] event",
- i.BuildTimeout, key, evt.Action)
+ i.BuildTimeout, key, evt.Type)
case i.prefixBuildQueue <- evt:
}
}
@@ -197,11 +191,11 @@ func (i *Indexer) buildIndex() {
return
}
t := time.Now()
- key := util.BytesToStringWithNoCopy(evt.KV.Key)
+ key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
prefix := key[:strings.LastIndex(key[:len(key)-1], "/")+1]
i.prefixLock.Lock()
- switch evt.Action {
+ switch evt.Type {
case pb.EVT_DELETE:
i.deletePrefixKey(prefix, key)
default:
@@ -210,10 +204,14 @@ func (i *Indexer) buildIndex() {
i.prefixLock.Unlock()
util.LogNilOrWarnf(t, "too long to rebuild(action: %s) index[%d], key is %s",
- evt.Action, key, len(i.prefixIndex))
+ evt.Type, key, len(i.prefixIndex))
+ case <-time.After(10 * time.Second):
+ i.prefixLock.RLock()
+ ReportCacheMetrics(i.cacher.Name(), "index", i.prefixIndex)
+ i.prefixLock.RUnlock()
}
}
- util.Logger().Debugf("build %s index goroutine is stopped", i.cacheType)
+ util.Logger().Debugf("build %s index goroutine is stopped", i.cacher.Name())
})
}
@@ -310,13 +308,12 @@ func (i *Indexer) Ready() <-chan struct{} {
return i.ready
}
-func NewCacheIndexer(t StoreType, cr Cacher) *Indexer {
+func NewCacheIndexer(cr Cacher) *Indexer {
return &Indexer{
BuildTimeout: DEFAULT_ADD_QUEUE_TIMEOUT,
cacher: cr,
- cacheType: t,
prefixIndex: make(map[string]map[string]struct{}, DEFAULT_MAX_EVENT_COUNT),
- prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT),
+ prefixBuildQueue: make(chan KvEvent, DEFAULT_MAX_EVENT_COUNT),
goroutine: util.NewGo(context.Background()),
ready: make(chan struct{}),
isClose: true,
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 4ffb9d4..3cf1fe7 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -27,15 +27,6 @@ import (
"time"
)
-const EVENT_BUS_MAX_SIZE = 1000
-
-type Event struct {
- Revision int64
- Type proto.EventType
- Prefix string
- Object interface{}
-}
-
type ListOptions struct {
Timeout time.Duration
Context context.Context
@@ -52,10 +43,12 @@ type ListWatcher struct {
rev int64
}
-func (lw *ListWatcher) List(op *ListOptions) ([]*mvccpb.KeyValue, error) {
+func (lw *ListWatcher) List(op ListOptions) ([]*mvccpb.KeyValue, error) {
otCtx, _ := context.WithTimeout(op.Context, op.Timeout)
resp, err := lw.Client.Do(otCtx, registry.WatchPrefixOpOptions(lw.Key)...)
if err != nil {
+ util.Logger().Errorf(err, "list key %s failed, rev: %d->0", lw.Key, lw.Revision())
+ lw.setRevision(0)
return nil, err
}
lw.setRevision(resp.Revision)
@@ -73,27 +66,28 @@ func (lw *ListWatcher) setRevision(rev int64) {
lw.rev = rev
}
-func (lw *ListWatcher) Watch(op *ListOptions) *Watcher {
+func (lw *ListWatcher) Watch(op ListOptions) *Watcher {
return newWatcher(lw, op)
}
-func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error {
+func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []KvEvent)) error {
+ rev := lw.Revision()
opts := append(
registry.WatchPrefixOpOptions(lw.Key),
- registry.WithRev(lw.Revision()+1),
+ registry.WithRev(rev+1),
registry.WithWatchCallback(
func(message string, resp *registry.PluginResponse) error {
if resp == nil || len(resp.Kvs) == 0 {
return fmt.Errorf("unknown event %s", resp)
}
- util.Logger().Infof("key %s: got a watch response %s from etcd server", lw.Key, resp)
+ util.Logger().Infof("watch prefix key %s, start rev %d+1, event: %s", lw.Key, rev, resp)
lw.setRevision(resp.Revision)
- evts := make([]*Event, len(resp.Kvs))
+ evts := make([]KvEvent, len(resp.Kvs))
for i, kv := range resp.Kvs {
- evt := &Event{Prefix: lw.Key, Revision: kv.ModRevision}
+ evt := KvEvent{Prefix: lw.Key, Revision: kv.ModRevision}
switch {
case resp.Action == registry.Put && kv.Version == 1:
evt.Type, evt.Object = proto.EVT_CREATE, kv
@@ -111,23 +105,25 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error
}))
err := lw.Client.Watch(ctx, opts...)
- if err != nil { // compact可能会导致watch失败
+ if err != nil { // compact可能会导致watch失败 or message body size lager than 4MB
+ util.Logger().Errorf(err, "watch key %s failed, start rev: %d+1->%d->0", lw.Key, rev, lw.Revision())
+
lw.setRevision(0)
- f([]*Event{errEvent(lw.Key, err)})
+ f([]KvEvent{errEvent(lw.Key, err)})
}
return err
}
type Watcher struct {
- ListOps *ListOptions
+ ListOps ListOptions
lw *ListWatcher
- bus chan []*Event
+ bus chan []KvEvent
stopCh chan struct{}
stop bool
mux sync.Mutex
}
-func (w *Watcher) EventBus() <-chan []*Event {
+func (w *Watcher) EventBus() <-chan []KvEvent {
return w.bus
}
@@ -148,7 +144,7 @@ func (w *Watcher) process(_ context.Context) {
}
}
-func (w *Watcher) sendEvent(evts []*Event) {
+func (w *Watcher) sendEvent(evts []KvEvent) {
defer util.RecoverAndReport()
w.bus <- evts
}
@@ -165,19 +161,19 @@ func (w *Watcher) Stop() {
w.mux.Unlock()
}
-func errEvent(key string, err error) *Event {
- return &Event{
+func errEvent(key string, err error) KvEvent {
+ return KvEvent{
Type: proto.EVT_ERROR,
Prefix: key,
Object: err,
}
}
-func newWatcher(lw *ListWatcher, listOps *ListOptions) *Watcher {
+func newWatcher(lw *ListWatcher, listOps ListOptions) *Watcher {
w := &Watcher{
ListOps: listOps,
lw: lw,
- bus: make(chan []*Event, EVENT_BUS_MAX_SIZE),
+ bus: make(chan []KvEvent, EVENT_BUS_MAX_SIZE),
stopCh: make(chan struct{}),
}
util.Go(w.process)
diff --git a/server/rest/handler.go b/server/core/backend/store/metric.go
similarity index 62%
copy from server/rest/handler.go
copy to server/core/backend/store/metric.go
index 154ee8f..9661aaa 100644
--- a/server/rest/handler.go
+++ b/server/core/backend/store/metric.go
@@ -14,37 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package rest
+package store
import (
- roa "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+ "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
- "github.com/apache/incubator-servicecomb-service-center/server/interceptor"
- "net/http"
- "time"
+ "github.com/apache/incubator-servicecomb-service-center/server/core"
+ "github.com/prometheus/client_golang/prometheus"
)
-const CTX_START_TIMESTAMP = "x-start-timestamp"
+var (
+ cacheSizeGauge = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "service_center",
+ Subsystem: "local",
+ Name: "cache_size_bytes",
+ Help: "Local cache size summary of backend store",
+ }, []string{"instance", "resource", "type"})
+)
func init() {
- // api
- http.Handle("/", &ServerHandler{})
-}
-
-type ServerHandler struct {
+ prometheus.MustRegister(cacheSizeGauge)
}
-func (s *ServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- util.SetRequestContext(r, CTX_START_TIMESTAMP, time.Now())
-
- err := interceptor.InvokeInterceptors(w, r)
- if err != nil {
- return
- }
-
- roa.GetRouter().ServeHTTP(w, r)
-
- // CAUTION: There will be cause a concurrent problem,
- // if here get/set the HTTP request headers.
+func ReportCacheMetrics(resource, t string, obj interface{}) {
+ instance := fmt.Sprint(core.Instance.Endpoints)
+ cacheSizeGauge.WithLabelValues(instance, resource, t).Set(float64(util.Sizeof(obj)))
}
diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go
index 86142b1..59fbabb 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/opt.go
@@ -21,12 +21,6 @@ import (
"time"
)
-const (
- DEFAULT_MAX_NO_EVENT_INTERVAL = 1 // TODO it should be set to 1 for prevent etcd data is lost accidentally.
- DEFAULT_LISTWATCH_TIMEOUT = 30 * time.Second
- DEFAULT_SELF_PRESERVATION_PERCENT = 0.8
-)
-
type KvCacherCfg struct {
Key string
InitSize int
@@ -74,5 +68,6 @@ func DefaultKvCacherConfig() KvCacherCfg {
Timeout: DEFAULT_LISTWATCH_TIMEOUT,
Period: time.Second,
NoEventMaxInterval: DEFAULT_MAX_NO_EVENT_INTERVAL,
+ InitSize: DEFAULT_CACHE_INIT_SIZE,
}
}
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index c462d41..fe5a020 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -19,74 +19,13 @@ package store
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/async"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
- apt "github.com/apache/incubator-servicecomb-service-center/server/core"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+ "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
- "strconv"
"sync"
)
-const (
- DOMAIN StoreType = iota
- PROJECT
- SERVICE
- SERVICE_INDEX
- SERVICE_ALIAS
- SERVICE_TAG
- RULE
- RULE_INDEX
- DEPENDENCY
- DEPENDENCY_RULE
- DEPENDENCY_QUEUE
- SCHEMA // big data should not be stored in memory.
- SCHEMA_SUMMARY
- INSTANCE
- LEASE
- ENDPOINTS
- typeEnd
-)
-
-const TIME_FORMAT = "15:04:05.000"
-
-var TypeNames = []string{
- SERVICE: "SERVICE",
- INSTANCE: "INSTANCE",
- DOMAIN: "DOMAIN",
- SCHEMA: "SCHEMA",
- SCHEMA_SUMMARY: "SCHEMA_SUMMARY",
- RULE: "RULE",
- LEASE: "LEASE",
- SERVICE_INDEX: "SERVICE_INDEX",
- SERVICE_ALIAS: "SERVICE_ALIAS",
- SERVICE_TAG: "SERVICE_TAG",
- RULE_INDEX: "RULE_INDEX",
- DEPENDENCY: "DEPENDENCY",
- DEPENDENCY_RULE: "DEPENDENCY_RULE",
- DEPENDENCY_QUEUE: "DEPENDENCY_QUEUE",
- PROJECT: "PROJECT",
- ENDPOINTS: "ENDPOINTS",
-}
-
-var TypeRoots = map[StoreType]string{
- SERVICE: apt.GetServiceRootKey(""),
- INSTANCE: apt.GetInstanceRootKey(""),
- DOMAIN: apt.GetDomainRootKey() + "/",
- SCHEMA: apt.GetServiceSchemaRootKey(""),
- SCHEMA_SUMMARY: apt.GetServiceSchemaSummaryRootKey(""),
- RULE: apt.GetServiceRuleRootKey(""),
- LEASE: apt.GetInstanceLeaseRootKey(""),
- SERVICE_INDEX: apt.GetServiceIndexRootKey(""),
- SERVICE_ALIAS: apt.GetServiceAliasRootKey(""),
- SERVICE_TAG: apt.GetServiceTagRootKey(""),
- RULE_INDEX: apt.GetServiceRuleIndexRootKey(""),
- DEPENDENCY: apt.GetServiceDependencyRootKey(""),
- DEPENDENCY_RULE: apt.GetServiceDependencyRuleRootKey(""),
- DEPENDENCY_QUEUE: apt.GetServiceDependencyQueueRootKey(""),
- PROJECT: apt.GetProjectRootKey(""),
- ENDPOINTS: apt.GetEndpointsRootKey(""),
-}
-
var store = &KvStore{}
func init() {
@@ -95,15 +34,6 @@ func init() {
AddEventHandleFunc(LEASE, store.onLeaseEvent)
}
-type StoreType int
-
-func (st StoreType) String() string {
- if int(st) < len(TypeNames) {
- return TypeNames[st]
- }
- return "TYPE" + strconv.Itoa(int(st))
-}
-
type KvStore struct {
indexers map[StoreType]*Indexer
asyncTaskSvc *async.AsyncTaskService
@@ -124,26 +54,21 @@ func (s *KvStore) Initialize() {
}
}
-func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) {
+func (s *KvStore) dispatchEvent(t StoreType, evt KvEvent) {
s.indexers[t].OnCacheEvent(evt)
EventProxy(t).OnEvent(evt)
}
func (s *KvStore) newStore(t StoreType, opts ...KvCacherCfgOption) {
- opts = append(opts,
- WithKey(TypeRoots[t]),
- WithInitSize(s.StoreSize(t)),
- WithEventFunc(func(evt *KvEvent) { s.dispatchEvent(t, evt) }),
- )
- s.newIndexer(t, NewKvCacher(opts...))
+ s.newIndexBuilder(t, NewKvCacher(t.String(), s.getKvCacherCfgOptions(t)...))
}
func (s *KvStore) newNullStore(t StoreType) {
- s.newIndexer(t, NullCacher)
+ s.newIndexBuilder(t, NullCacher)
}
-func (s *KvStore) newIndexer(t StoreType, cacher Cacher) {
- indexer := NewCacheIndexer(t, cacher)
+func (s *KvStore) newIndexBuilder(t StoreType, cacher Cacher) {
+ indexer := NewCacheIndexer(cacher)
s.indexers[t] = indexer
indexer.Run()
}
@@ -153,15 +78,19 @@ func (s *KvStore) Run() {
s.asyncTaskSvc.Run()
}
-func (s *KvStore) StoreSize(t StoreType) int {
+func (s *KvStore) getKvCacherCfgOptions(t StoreType) (opts []KvCacherCfgOption) {
switch t {
- case DOMAIN:
- return 10
- case INSTANCE, LEASE:
- return 1000
- default:
- return 100
+ case INSTANCE:
+ opts = append(opts, WithDeferHandler(s.SelfPreservationHandler()))
}
+ sz := TypeInitSize[t]
+ if sz > 0 {
+ opts = append(opts, WithInitSize(sz))
+ }
+ opts = append(opts,
+ WithKey(TypeRoots[t]),
+ WithEventFunc(func(evt KvEvent) { s.dispatchEvent(t, evt) }))
+ return
}
func (s *KvStore) SelfPreservationHandler() DeferHandler {
@@ -171,14 +100,17 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler {
func (s *KvStore) store(ctx context.Context) {
for t := StoreType(0); t != typeEnd; t++ {
switch t {
- case INSTANCE:
- s.newStore(t, WithDeferHandler(s.SelfPreservationHandler()))
case SCHEMA:
continue
default:
s.newStore(t)
}
}
+
+ s.wait(ctx)
+}
+
+func (s *KvStore) wait(ctx context.Context) {
for _, i := range s.indexers {
select {
case <-ctx.Done():
@@ -191,18 +123,13 @@ func (s *KvStore) store(ctx context.Context) {
util.Logger().Debugf("all indexers are ready")
}
-func (s *KvStore) onLeaseEvent(evt *KvEvent) {
- if evt.Action != pb.EVT_DELETE {
+func (s *KvStore) onLeaseEvent(evt KvEvent) {
+ if evt.Type != pb.EVT_DELETE {
return
}
- key := util.BytesToStringWithNoCopy(evt.KV.Key)
- leaseID := util.BytesToStringWithNoCopy(evt.KV.Value)
-
+ key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
s.asyncTaskSvc.DeferRemove(ToLeaseAsyncTaskKey(key))
-
- util.Logger().Debugf("push task to async remove queue successfully, key %s %s [%s] event",
- key, leaseID, evt.Action)
}
func (s *KvStore) closed() bool {
return s.isClose
diff --git a/server/core/proto/services.go b/server/core/proto/services.go
index 45ecd48..026b318 100644
--- a/server/core/proto/services.go
+++ b/server/core/proto/services.go
@@ -100,6 +100,8 @@ type ServerConfig struct {
CompactIndexDelta int64 `json:"compactIndexDelta"`
CompactInterval string `json:"compactInterval"`
+ EnablePProf bool `json:"enablePProf"`
+
LoggerName string `json:"-"`
LogRotateSize int64 `json:"logRotateSize"`
LogBackupCount int64 `json:"logBackupCount"`
diff --git a/server/plugin/infra/registry/etcd/etcd_test.go b/server/plugin/infra/registry/etcd/etcd_test.go
index 7f726dd..896dd72 100644
--- a/server/plugin/infra/registry/etcd/etcd_test.go
+++ b/server/plugin/infra/registry/etcd/etcd_test.go
@@ -69,7 +69,6 @@ func TestEtcdClient_Delete(t *testing.T) {
panic(err)
}
if len(resp.Kvs) != 1 || string(resp.Kvs[0].Key) != "/test_range/a" {
- fmt.Println("TestEtcdClient_Delete failed, %#v", resp.Kvs)
- t.FailNow()
+ t.Fatalf("TestEtcdClient_Delete failed, %#v", resp.Kvs)
}
}
diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go b/server/plugin/infra/tracing/buildin/file_collector_test.go
index 4f3e150..92956db 100644
--- a/server/plugin/infra/tracing/buildin/file_collector_test.go
+++ b/server/plugin/infra/tracing/buildin/file_collector_test.go
@@ -17,7 +17,6 @@
package buildin
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"golang.org/x/net/context"
@@ -43,8 +42,7 @@ func TestFileCollector_Collect(t *testing.T) {
for i := 0; i < 10; i++ {
err := fc.Collect(&zipkincore.Span{})
if err != nil {
- fmt.Println(err)
- t.FailNow()
+ t.Fatal(err)
}
}
diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go
index 069e6a9..c44ba2a 100644
--- a/server/plugin/infra/tracing/buildin/span_test.go
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -148,22 +148,19 @@ func TestFromZipkinSpan(t *testing.T) {
span := &zipkincore.Span{}
err := json.Unmarshal(sample, &span)
if err != nil {
- fmt.Println("TestFromZipkinSpan Unmarshal", err)
- t.FailNow()
+ t.Fatalf("TestFromZipkinSpan Unmarshal", err)
}
s := FromZipkinSpan(span)
b, err := json.Marshal(s)
if err != nil {
- fmt.Println("TestFromZipkinSpan Marshal", err)
- t.FailNow()
+ t.Fatalf("TestFromZipkinSpan Marshal", err)
}
fmt.Println(string(b))
s = FromZipkinSpan(&zipkincore.Span{})
b, err = json.Marshal(s)
if err != nil {
- fmt.Println("TestFromZipkinSpan Marshal", err)
- t.FailNow()
+ t.Fatalf("TestFromZipkinSpan Marshal", err)
}
fmt.Println(string(b))
}
diff --git a/server/rest/handler.go b/server/rest/handler.go
index 154ee8f..3cd5e22 100644
--- a/server/rest/handler.go
+++ b/server/rest/handler.go
@@ -29,7 +29,7 @@ const CTX_START_TIMESTAMP = "x-start-timestamp"
func init() {
// api
- http.Handle("/", &ServerHandler{})
+ RegisterServerHandler("/", &ServerHandler{})
}
type ServerHandler struct {
diff --git a/server/rest/metric.go b/server/rest/metric.go
index 5642382..e97fb5d 100644
--- a/server/rest/metric.go
+++ b/server/rest/metric.go
@@ -57,7 +57,7 @@ var (
func init() {
prometheus.MustRegister(incomingRequests, successfulRequests, reqDurations)
- http.Handle("/metrics", prometheus.Handler())
+ RegisterServerHandler("/metrics", prometheus.Handler())
}
func ReportRequestCompleted(w http.ResponseWriter, r *http.Request, start time.Time) {
diff --git a/server/rest/server.go b/server/rest/server.go
index fda615f..1ef48ae 100644
--- a/server/rest/server.go
+++ b/server/rest/server.go
@@ -107,6 +107,7 @@ func NewServer(ep string) (srv *rest.Server, err error) {
}
srvCfg.Addr = ipAddr
srv = rest.NewServer(srvCfg)
+ srv.Handler = DefaultServerMux
if srvCfg.TLSConfig == nil {
err = srv.Listen()
diff --git a/pkg/util/util_test.go b/server/rest/server_mux.go
similarity index 57%
copy from pkg/util/util_test.go
copy to server/rest/server_mux.go
index 3a54c36..f55c59f 100644
--- a/pkg/util/util_test.go
+++ b/server/rest/server_mux.go
@@ -14,47 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package util
+package rest
-import (
- "fmt"
- "testing"
-)
+import "net/http"
-func fail(t *testing.T, format string, args ...interface{}) {
- fmt.Printf(format, args...)
- fmt.Println()
- t.FailNow()
-}
-
-func TestBytesToInt32(t *testing.T) {
- bs := []byte{0, 0, 0, 1}
- i := BytesToInt32(bs)
- if i != 1 {
- t.FailNow()
- }
-
- bs = []byte{1, 0, 0, 0}
- i = BytesToInt32(bs)
- if i != 1<<(3*8) {
- t.FailNow()
- }
+var DefaultServerMux = http.NewServeMux()
- bs = []byte{0, 0, 0, 0, 1}
- i = BytesToInt32(bs)
- if i != 0 {
- t.FailNow()
- }
-
- bs = []byte{1}
- i = BytesToInt32(bs)
- if i != 1 {
- t.FailNow()
- }
+func RegisterServerHandleFunc(pattern string, f http.HandlerFunc) {
+ DefaultServerMux.HandleFunc(pattern, f)
+}
- bs = []byte{1, 0}
- i = BytesToInt32(bs)
- if i != 1<<8 {
- t.FailNow()
- }
+func RegisterServerHandler(pattern string, h http.Handler) {
+ DefaultServerMux.Handle(pattern, h)
}
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 6961c81..836008b 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -40,8 +40,8 @@ func (h *DependencyEventHandler) Type() store.StoreType {
return store.DEPENDENCY_QUEUE
}
-func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) {
- action := evt.Action
+func (h *DependencyEventHandler) OnEvent(evt store.KvEvent) {
+ action := evt.Type
if action != pb.EVT_CREATE && action != pb.EVT_UPDATE && action != pb.EVT_INIT {
return
}
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 0e0c02c..e9ee8b7 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -24,6 +24,7 @@ import (
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
nf "github.com/apache/incubator-servicecomb-service-center/server/service/notification"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+ "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"strings"
)
@@ -35,13 +36,13 @@ func (h *InstanceEventHandler) Type() store.StoreType {
return store.INSTANCE
}
-func (h *InstanceEventHandler) OnEvent(evt *store.KvEvent) {
- action := evt.Action
+func (h *InstanceEventHandler) OnEvent(evt store.KvEvent) {
+ action := evt.Type
if action == pb.EVT_INIT {
return
}
- kv := evt.KV
+ kv := evt.Object.(*mvccpb.KeyValue)
providerId, providerInstanceId, domainProject, data := pb.GetInfoFromInstKV(kv)
if data == nil {
util.Logger().Errorf(nil,
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index f85b62c..bca8390 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -24,6 +24,7 @@ import (
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
nf "github.com/apache/incubator-servicecomb-service-center/server/service/notification"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+ "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
@@ -79,13 +80,13 @@ func (h *RuleEventHandler) Type() store.StoreType {
return store.RULE
}
-func (h *RuleEventHandler) OnEvent(evt *store.KvEvent) {
- action := evt.Action
+func (h *RuleEventHandler) OnEvent(evt store.KvEvent) {
+ action := evt.Type
if action == pb.EVT_INIT {
return
}
- kv := evt.KV
+ kv := evt.Object.(*mvccpb.KeyValue)
providerId, ruleId, domainProject, data := pb.GetInfoFromRuleKV(kv)
if data == nil {
util.Logger().Errorf(nil,
diff --git a/server/service/event/service_event_handler.go b/server/service/event/service_event_handler.go
index 2aa6f4d..8ffd805 100644
--- a/server/service/event/service_event_handler.go
+++ b/server/service/event/service_event_handler.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+ "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"strings"
)
@@ -32,13 +33,13 @@ func (h *ServiceEventHandler) Type() store.StoreType {
return store.SERVICE
}
-func (h *ServiceEventHandler) OnEvent(evt *store.KvEvent) {
- action := evt.Action
+func (h *ServiceEventHandler) OnEvent(evt store.KvEvent) {
+ action := evt.Type
if action != pb.EVT_CREATE && action != pb.EVT_INIT {
return
}
- kv := evt.KV
+ kv := evt.Object.(*mvccpb.KeyValue)
serviceId, domainProject, data := pb.GetInfoFromSvcKV(kv)
if data == nil {
util.Logger().Errorf(nil,
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index cdf96d6..2b14f29 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -24,6 +24,7 @@ import (
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
nf "github.com/apache/incubator-servicecomb-service-center/server/service/notification"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+ "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
@@ -90,13 +91,13 @@ func (h *TagEventHandler) Type() store.StoreType {
return store.SERVICE_TAG
}
-func (h *TagEventHandler) OnEvent(evt *store.KvEvent) {
- action := evt.Action
+func (h *TagEventHandler) OnEvent(evt store.KvEvent) {
+ action := evt.Type
if action == pb.EVT_INIT {
return
}
- kv := evt.KV
+ kv := evt.Object.(*mvccpb.KeyValue)
consumerId, domainProject, data := pb.GetInfoFromTagKV(kv)
if data == nil {
util.Logger().Errorf(nil,
diff --git a/server/service/util/dependency_test.go b/server/service/util/dependency_test.go
index 4ca26b8..d28b0f1 100644
--- a/server/service/util/dependency_test.go
+++ b/server/service/util/dependency_test.go
@@ -17,7 +17,6 @@
package util
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"golang.org/x/net/context"
@@ -27,8 +26,7 @@ import (
func TestDeleteDependencyForService(t *testing.T) {
_, err := DeleteDependencyForDeleteService("", "", &proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`DeleteDependencyForDeleteService failed`)
- t.FailNow()
+ t.Fatalf(`DeleteDependencyForDeleteService failed`)
}
_, err = updateProviderDependencyRuleUtil(
@@ -41,8 +39,7 @@ func TestDeleteDependencyForService(t *testing.T) {
AppId: "a",
}, "")
if err != nil {
- fmt.Printf(`deleteDependencyRuleUtil with the same deps failed`)
- t.FailNow()
+ t.Fatalf(`deleteDependencyRuleUtil with the same deps failed`)
}
_, err = updateProviderDependencyRuleUtil(
@@ -55,42 +52,36 @@ func TestDeleteDependencyForService(t *testing.T) {
AppId: "a",
}, "")
if err != nil {
- fmt.Printf(`deleteDependencyRuleUtil failed`)
- t.FailNow()
+ t.Fatalf(`deleteDependencyRuleUtil failed`)
}
_, err = deleteConsumerDepOfProviderRule(context.Background(), "", &proto.MicroServiceKey{}, &proto.MicroServiceKey{})
if err == nil {
- fmt.Printf(`deleteConsumerDepOfProviderRule failed`)
- t.FailNow()
+ t.Fatalf(`deleteConsumerDepOfProviderRule failed`)
}
_, err = deleteDepRuleUtil("", &proto.MicroServiceDependency{}, &proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`deleteDepRuleUtil failed`)
- t.FailNow()
+ t.Fatalf(`deleteDepRuleUtil failed`)
}
}
func TestTransferToMicroServiceDependency(t *testing.T) {
_, err := TransferToMicroServiceDependency(util.SetContext(context.Background(), "cacheOnly", "1"), "")
if err != nil {
- fmt.Printf(`TransferToMicroServiceDependency WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`TransferToMicroServiceDependency WithCacheOnly failed`)
}
_, err = TransferToMicroServiceDependency(context.Background(), "")
if err == nil {
- fmt.Printf(`TransferToMicroServiceDependency failed`)
- t.FailNow()
+ t.Fatalf(`TransferToMicroServiceDependency failed`)
}
}
func TestEqualServiceDependency(t *testing.T) {
b := equalServiceDependency(&proto.MicroServiceKey{}, &proto.MicroServiceKey{})
if !b {
- fmt.Printf(`equalServiceDependency failed`)
- t.FailNow()
+ t.Fatalf(`equalServiceDependency failed`)
}
b = equalServiceDependency(&proto.MicroServiceKey{
@@ -99,8 +90,7 @@ func TestEqualServiceDependency(t *testing.T) {
AppId: "b",
})
if b {
- fmt.Printf(`equalServiceDependency failed`)
- t.FailNow()
+ t.Fatalf(`equalServiceDependency failed`)
}
}
@@ -109,28 +99,24 @@ func TestCreateDependencyRule(t *testing.T) {
Consumer: &proto.MicroServiceKey{},
})
if err != nil {
- fmt.Printf(`CreateDependencyRule failed`)
- t.FailNow()
+ t.Fatalf(`CreateDependencyRule failed`)
}
err = AddDependencyRule(context.Background(), &Dependency{
Consumer: &proto.MicroServiceKey{},
})
if err != nil {
- fmt.Printf(`AddDependencyRule failed`)
- t.FailNow()
+ t.Fatalf(`AddDependencyRule failed`)
}
err = AddServiceVersionRule(context.Background(), "", &proto.MicroService{}, &proto.MicroServiceKey{})
if err == nil {
- fmt.Printf(`AddServiceVersionRule failed`)
- t.FailNow()
+ t.Fatalf(`AddServiceVersionRule failed`)
}
_, err = addDepRuleUtil("", &proto.MicroServiceDependency{}, &proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`addDepRuleUtil failed`)
- t.FailNow()
+ t.Fatalf(`addDepRuleUtil failed`)
}
b, err := containServiceDependency([]*proto.MicroServiceKey{
@@ -139,8 +125,7 @@ func TestCreateDependencyRule(t *testing.T) {
AppId: "b",
})
if b {
- fmt.Printf(`containServiceDependency contain failed`)
- t.FailNow()
+ t.Fatalf(`containServiceDependency contain failed`)
}
b, err = containServiceDependency([]*proto.MicroServiceKey{
@@ -149,14 +134,12 @@ func TestCreateDependencyRule(t *testing.T) {
AppId: "a",
})
if !b {
- fmt.Printf(`containServiceDependency not contain failed`)
- t.FailNow()
+ t.Fatalf(`containServiceDependency not contain failed`)
}
_, err = containServiceDependency(nil, nil)
if err == nil {
- fmt.Printf(`containServiceDependency invalid failed`)
- t.FailNow()
+ t.Fatalf(`containServiceDependency invalid failed`)
}
ok := diffServiceVersion(&proto.MicroServiceKey{
@@ -169,20 +152,17 @@ func TestCreateDependencyRule(t *testing.T) {
Version: "2",
})
if !ok {
- fmt.Printf(`diffServiceVersion failed`)
- t.FailNow()
+ t.Fatalf(`diffServiceVersion failed`)
}
err = validateMicroServiceKey(&proto.MicroServiceKey{}, false)
if err == nil {
- fmt.Printf(`validateMicroServiceKey false invalid failed`)
- t.FailNow()
+ t.Fatalf(`validateMicroServiceKey false invalid failed`)
}
err = validateMicroServiceKey(&proto.MicroServiceKey{}, true)
if err == nil {
- fmt.Printf(`validateMicroServiceKey true invalid failed`)
- t.FailNow()
+ t.Fatalf(`validateMicroServiceKey true invalid failed`)
}
err = validateMicroServiceKey(&proto.MicroServiceKey{
@@ -191,8 +171,7 @@ func TestCreateDependencyRule(t *testing.T) {
Version: "latest",
}, true)
if err != nil {
- fmt.Printf(`validateMicroServiceKey true failed`)
- t.FailNow()
+ t.Fatalf(`validateMicroServiceKey true failed`)
}
err = validateMicroServiceKey(&proto.MicroServiceKey{
@@ -201,14 +180,12 @@ func TestCreateDependencyRule(t *testing.T) {
Version: "1.0.0",
}, false)
if err != nil {
- fmt.Printf(`validateMicroServiceKey false failed`)
- t.FailNow()
+ t.Fatalf(`validateMicroServiceKey false failed`)
}
ok = isDependencyAll(&proto.MicroServiceDependency{})
if ok {
- fmt.Printf(`isDependencyAll not * failed`)
- t.FailNow()
+ t.Fatalf(`isDependencyAll not * failed`)
}
ok = isDependencyAll(&proto.MicroServiceDependency{
@@ -219,8 +196,7 @@ func TestCreateDependencyRule(t *testing.T) {
},
})
if !ok {
- fmt.Printf(`isDependencyAll * failed`)
- t.FailNow()
+ t.Fatalf(`isDependencyAll * failed`)
}
ok = isExist([]*proto.MicroServiceKey{
@@ -231,24 +207,21 @@ func TestCreateDependencyRule(t *testing.T) {
ServiceName: "*",
})
if !ok {
- fmt.Printf(`isExist failed`)
- t.FailNow()
+ t.Fatalf(`isExist failed`)
}
}
func TestBadParamsResponse(t *testing.T) {
p := BadParamsResponse("a")
if p == nil {
- fmt.Printf(`BadParamsResponse failed`)
- t.FailNow()
+ t.Fatalf(`BadParamsResponse failed`)
}
}
func TestParamsChecker(t *testing.T) {
p := ParamsChecker(nil, nil)
if p == nil || p.Response.Code == proto.Response_SUCCESS {
- fmt.Printf(`ParamsChecker invalid failed`)
- t.FailNow()
+ t.Fatalf(`ParamsChecker invalid failed`)
}
p = ParamsChecker(&proto.MicroServiceKey{
@@ -257,8 +230,7 @@ func TestParamsChecker(t *testing.T) {
Version: "1.0.0",
}, nil)
if p == nil || p.Response.Code == proto.Response_SUCCESS {
- fmt.Printf(`ParamsChecker invalid failed`)
- t.FailNow()
+ t.Fatalf(`ParamsChecker invalid failed`)
}
p = ParamsChecker(&proto.MicroServiceKey{
@@ -269,8 +241,7 @@ func TestParamsChecker(t *testing.T) {
{ServiceName: "*"},
})
if p != nil {
- fmt.Printf(`ParamsChecker * failed`)
- t.FailNow()
+ t.Fatalf(`ParamsChecker * failed`)
}
p = ParamsChecker(&proto.MicroServiceKey{
@@ -281,8 +252,7 @@ func TestParamsChecker(t *testing.T) {
{},
})
if p == nil {
- fmt.Printf(`ParamsChecker invalid provider key failed`)
- t.FailNow()
+ t.Fatalf(`ParamsChecker invalid provider key failed`)
}
p = ParamsChecker(&proto.MicroServiceKey{
@@ -294,30 +264,26 @@ func TestParamsChecker(t *testing.T) {
{ServiceName: "a", Version: "1"},
})
if p == nil {
- fmt.Printf(`ParamsChecker duplicate provider key failed`)
- t.FailNow()
+ t.Fatalf(`ParamsChecker duplicate provider key failed`)
}
}
func TestServiceDependencyRuleExist(t *testing.T) {
_, err := ProviderDependencyRuleExist(util.SetContext(context.Background(), "cacheOnly", "1"), &proto.MicroServiceKey{}, &proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`ServiceDependencyRuleExist WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`ServiceDependencyRuleExist WithCacheOnly failed`)
}
_, err = ProviderDependencyRuleExist(context.Background(), &proto.MicroServiceKey{}, &proto.MicroServiceKey{})
if err == nil {
- fmt.Printf(`ServiceDependencyRuleExist failed`)
- t.FailNow()
+ t.Fatalf(`ServiceDependencyRuleExist failed`)
}
}
func TestUpdateServiceForAddDependency(t *testing.T) {
_, _, err := updateDepRuleUtil("", &proto.MicroServiceDependency{}, &proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`updateDepRuleUtil failed`)
- t.FailNow()
+ t.Fatalf(`updateDepRuleUtil failed`)
}
old := isNeedUpdate([]*proto.MicroServiceKey{
@@ -332,28 +298,24 @@ func TestUpdateServiceForAddDependency(t *testing.T) {
Version: "2",
})
if old == nil {
- fmt.Printf(`isNeedUpdate failed`)
- t.FailNow()
+ t.Fatalf(`isNeedUpdate failed`)
}
}
func TestFilter(t *testing.T) {
_, _, err := getConsumerIdsWithFilter(context.Background(), "", &proto.MicroService{}, noFilter)
if err == nil {
- fmt.Printf(`getConsumerIdsWithFilter failed`)
- t.FailNow()
+ t.Fatalf(`getConsumerIdsWithFilter failed`)
}
_, _, err = filterConsumerIds(context.Background(), []string{}, noFilter)
if err != nil {
- fmt.Printf(`filterConsumerIds invalid failed`)
- t.FailNow()
+ t.Fatalf(`filterConsumerIds invalid failed`)
}
_, _, err = filterConsumerIds(context.Background(), []string{"a"}, noFilter)
if err != nil {
- fmt.Printf(`filterConsumerIds invalid failed`)
- t.FailNow()
+ t.Fatalf(`filterConsumerIds invalid failed`)
}
rf := RuleFilter{
@@ -363,8 +325,7 @@ func TestFilter(t *testing.T) {
}
_, _, err = filterConsumerIds(context.Background(), []string{"a"}, rf.Filter)
if err != nil {
- fmt.Printf(`filterConsumerIds invalid failed`)
- t.FailNow()
+ t.Fatalf(`filterConsumerIds invalid failed`)
}
}
@@ -381,8 +342,7 @@ func TestDependency(t *testing.T) {
d.AddConsumerOfProviderRule()
err := d.UpdateProvidersRuleOfConsumer("")
if err == nil {
- fmt.Printf(`Dependency_UpdateProvidersRuleOfConsumer failed`)
- t.FailNow()
+ t.Fatalf(`Dependency_UpdateProvidersRuleOfConsumer failed`)
}
dr := &DependencyRelation{
@@ -392,8 +352,7 @@ func TestDependency(t *testing.T) {
}
_, err = dr.GetDependencyProviders()
if err != nil {
- fmt.Printf(`DependencyRelation_GetDependencyProviders failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_GetDependencyProviders failed`)
}
_, err = dr.getDependencyProviderIds([]*proto.MicroServiceKey{
@@ -402,34 +361,29 @@ func TestDependency(t *testing.T) {
{ServiceName: "b", Version: "latest"},
})
if err != nil {
- fmt.Printf(`DependencyRelation_getDependencyProviderIds * WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getDependencyProviderIds * WithCacheOnly failed`)
}
_, err = dr.getDependencyProviderIds([]*proto.MicroServiceKey{
{ServiceName: "a", Version: "1.0.0"},
{ServiceName: "b", Version: "latest"},
})
if err != nil {
- fmt.Printf(`DependencyRelation_getDependencyProviderIds WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getDependencyProviderIds WithCacheOnly failed`)
}
_, err = dr.GetDependencyConsumers()
if err != nil {
- fmt.Printf(`DependencyRelation_GetDependencyConsumers WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_GetDependencyConsumers WithCacheOnly failed`)
}
_, err = dr.getServiceByMicroServiceKey(&proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`DependencyRelation_getServiceByMicroServiceKey WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getServiceByMicroServiceKey WithCacheOnly failed`)
}
_, err = dr.getConsumerOfSameServiceNameAndAppId(&proto.MicroServiceKey{})
if err != nil {
- fmt.Printf(`DependencyRelation_getConsumerOfSameServiceNameAndAppId WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getConsumerOfSameServiceNameAndAppId WithCacheOnly failed`)
}
dr = &DependencyRelation{
@@ -441,34 +395,29 @@ func TestDependency(t *testing.T) {
{ServiceName: "*"},
})
if err == nil {
- fmt.Printf(`DependencyRelation_getDependencyProviderIds * failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getDependencyProviderIds * failed`)
}
_, err = dr.getDependencyProviderIds([]*proto.MicroServiceKey{
{ServiceName: "a", Version: "1.0.0"},
{ServiceName: "b", Version: "latest"},
})
if err == nil {
- fmt.Printf(`DependencyRelation_getDependencyProviderIds failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getDependencyProviderIds failed`)
}
_, err = dr.GetDependencyConsumers()
if err == nil {
- fmt.Printf(`DependencyRelation_GetDependencyConsumers failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_GetDependencyConsumers failed`)
}
_, err = dr.getServiceByMicroServiceKey(&proto.MicroServiceKey{})
if err == nil {
- fmt.Printf(`DependencyRelation_getServiceByMicroServiceKey failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getServiceByMicroServiceKey failed`)
}
_, err = dr.getConsumerOfSameServiceNameAndAppId(&proto.MicroServiceKey{})
if err == nil {
- fmt.Printf(`DependencyRelation_getConsumerOfSameServiceNameAndAppId failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getConsumerOfSameServiceNameAndAppId failed`)
}
dr = &DependencyRelation{
@@ -477,8 +426,7 @@ func TestDependency(t *testing.T) {
}
_, err = dr.getDependencyConsumersOfProvider()
if err == nil {
- fmt.Printf(`DependencyRelation_getDependencyConsumersOfProvider failed`)
- t.FailNow()
+ t.Fatalf(`DependencyRelation_getDependencyConsumersOfProvider failed`)
}
}
@@ -488,7 +436,6 @@ func TestDependencyRelationFilterOpt(t *testing.T) {
WithoutSelfDependency(),
)
if !op.NonSelf || !op.SameDomainProject {
- fmt.Printf(`toDependencyRelationFilterOpt failed`)
- t.FailNow()
+ t.Fatalf(`toDependencyRelationFilterOpt failed`)
}
}
diff --git a/server/service/util/domain_util_test.go b/server/service/util/domain_util_test.go
index b7ed660..331bd27 100644
--- a/server/service/util/domain_util_test.go
+++ b/server/service/util/domain_util_test.go
@@ -17,7 +17,6 @@
package util_test
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
"golang.org/x/net/context"
@@ -27,77 +26,66 @@ import (
func TestGetDomain(t *testing.T) {
_, err := serviceUtil.GetAllDomainRawData(util.SetContext(context.Background(), "cacheOnly", "1"))
if err != nil {
- fmt.Printf("GetAllDomainRawData WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetAllDomainRawData WithCacheOnly failed")
}
_, err = serviceUtil.GetAllDomainRawData(context.Background())
if err == nil {
- fmt.Printf("GetAllDomainRawData failed")
- t.FailNow()
+ t.Fatalf("GetAllDomainRawData failed")
}
_, err = serviceUtil.GetAllDomain(util.SetContext(context.Background(), "cacheOnly", "1"))
if err != nil {
- fmt.Printf("GetAllDomain WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetAllDomain WithCacheOnly failed")
}
_, err = serviceUtil.GetAllDomain(context.Background())
if err == nil {
- fmt.Printf("GetAllDomain failed")
- t.FailNow()
+ t.Fatalf("GetAllDomain failed")
}
}
func TestDomainExist(t *testing.T) {
_, err := serviceUtil.DomainExist(util.SetContext(context.Background(), "cacheOnly", "1"), "")
if err != nil {
- fmt.Printf("DomainExist WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("DomainExist WithCacheOnly failed")
}
_, err = serviceUtil.DomainExist(context.Background(), "")
if err == nil {
- fmt.Printf("DomainExist failed")
- t.FailNow()
+ t.Fatalf("DomainExist failed")
}
}
func TestNewDomain(t *testing.T) {
err := serviceUtil.NewDomain(context.Background(), "")
if err == nil {
- fmt.Printf("NewDomain failed")
- t.FailNow()
+ t.Fatalf("NewDomain failed")
}
}
func TestProjectExist(t *testing.T) {
_, err := serviceUtil.ProjectExist(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err != nil {
- fmt.Printf("DomainExist WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("DomainExist WithCacheOnly failed")
}
_, err = serviceUtil.ProjectExist(context.Background(), "", "")
if err == nil {
- fmt.Printf("DomainExist failed")
- t.FailNow()
+ t.Fatalf("DomainExist failed")
}
}
func TestNewProject(t *testing.T) {
err := serviceUtil.NewProject(context.Background(), "", "")
if err == nil {
- fmt.Printf("NewProject failed")
- t.FailNow()
+ t.Fatalf("NewProject failed")
}
}
func TestNewDomainProject(t *testing.T) {
err := serviceUtil.NewDomainProject(context.Background(), "", "")
if err == nil {
- fmt.Printf("NewDomainProject failed")
- t.FailNow()
+ t.Fatalf("NewDomainProject failed")
}
}
diff --git a/server/service/util/heartbeat_util_test.go b/server/service/util/heartbeat_util_test.go
index 4e2cadb..26ef036 100644
--- a/server/service/util/heartbeat_util_test.go
+++ b/server/service/util/heartbeat_util_test.go
@@ -17,7 +17,6 @@
package util_test
import (
- "fmt"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
"golang.org/x/net/context"
"testing"
@@ -35,13 +34,11 @@ func TestHeartbeatUtil(t *testing.T) {
func TestKeepAliveLease(t *testing.T) {
_, err := serviceUtil.KeepAliveLease(context.Background(), "", "", "", -1)
if err == nil {
- fmt.Printf("KeepAliveLease -1 failed")
- t.FailNow()
+ t.Fatalf("KeepAliveLease -1 failed")
}
_, err = serviceUtil.KeepAliveLease(context.Background(), "", "", "", 0)
if err == nil {
- fmt.Printf("KeepAliveLease failed")
- t.FailNow()
+ t.Fatalf("KeepAliveLease failed")
}
}
diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go
index cfe3224..3293f39 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -17,7 +17,6 @@
package util
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"golang.org/x/net/context"
@@ -28,62 +27,53 @@ func TestGetLeaseId(t *testing.T) {
_, err := GetLeaseId(util.SetContext(context.Background(), "cacheOnly", "1"), "", "", "")
if err != nil {
- fmt.Printf(`GetLeaseId WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`GetLeaseId WithCacheOnly failed`)
}
_, err = GetLeaseId(context.Background(), "", "", "")
if err == nil {
- fmt.Printf(`GetLeaseId failed`)
- t.FailNow()
+ t.Fatalf(`GetLeaseId failed`)
}
}
func TestGetInstance(t *testing.T) {
_, err := GetInstance(util.SetContext(context.Background(), "cacheOnly", "1"), "", "", "")
if err != nil {
- fmt.Printf(`GetInstance WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`GetInstance WithCacheOnly failed`)
}
_, err = GetInstance(context.Background(), "", "", "")
if err == nil {
- fmt.Printf(`GetInstance failed`)
- t.FailNow()
+ t.Fatalf(`GetInstance failed`)
}
_, err = GetAllInstancesOfOneService(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err != nil {
- fmt.Printf(`GetAllInstancesOfOneService WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`GetAllInstancesOfOneService WithCacheOnly failed`)
}
_, err = GetAllInstancesOfOneService(context.Background(), "", "")
if err == nil {
- fmt.Printf(`GetAllInstancesOfOneService failed`)
- t.FailNow()
+ t.Fatalf(`GetAllInstancesOfOneService failed`)
}
QueryAllProvidersInstances(context.Background(), "")
_, err = queryServiceInstancesKvs(context.Background(), "", 0)
if err == nil {
- fmt.Printf(`queryServiceInstancesKvs failed`)
- t.FailNow()
+ t.Fatalf(`queryServiceInstancesKvs failed`)
}
}
func TestInstanceExist(t *testing.T) {
_, err := InstanceExist(util.SetContext(context.Background(), "cacheOnly", "1"), "", "", "")
if err != nil {
- fmt.Printf(`InstanceExist WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`InstanceExist WithCacheOnly failed`)
}
_, err = InstanceExist(context.Background(), "", "", "")
if err == nil {
- fmt.Printf(`InstanceExist failed`)
- t.FailNow()
+ t.Fatalf(`InstanceExist failed`)
}
}
@@ -92,31 +82,27 @@ func TestCheckEndPoints(t *testing.T) {
ServiceId: "a",
})
if err == nil {
- fmt.Printf(`CheckEndPoints failed`)
- t.FailNow()
+ t.Fatalf(`CheckEndPoints failed`)
}
}
func TestDeleteServiceAllInstances(t *testing.T) {
err := DeleteServiceAllInstances(context.Background(), "")
if err == nil {
- fmt.Printf(`DeleteServiceAllInstances failed`)
- t.FailNow()
+ t.Fatalf(`DeleteServiceAllInstances failed`)
}
}
func TestParseEndpointValue(t *testing.T) {
epv := ParseEndpointIndexValue([]byte("x/y"))
if epv.serviceId != "x" || epv.instanceId != "y" {
- fmt.Printf(`ParseEndpointIndexValue failed`)
- t.FailNow()
+ t.Fatalf(`ParseEndpointIndexValue failed`)
}
}
func TestGetInstanceCountOfOneService(t *testing.T) {
_, err := GetInstanceCountOfOneService(context.Background(), "", "")
if err == nil {
- fmt.Printf(`GetInstanceCountOfOneService failed`)
- t.FailNow()
+ t.Fatalf(`GetInstanceCountOfOneService failed`)
}
}
diff --git a/server/service/util/rule_util_test.go b/server/service/util/rule_util_test.go
index 5cf142a..e029d2b 100644
--- a/server/service/util/rule_util_test.go
+++ b/server/service/util/rule_util_test.go
@@ -17,7 +17,6 @@
package util_test
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
@@ -34,42 +33,36 @@ func TestRuleFilter_Filter(t *testing.T) {
}
_, err := rf.Filter(context.Background(), "")
if err != nil {
- fmt.Printf("RuleFilter Filter failed")
- t.FailNow()
+ t.Fatalf("RuleFilter Filter failed")
}
}
func TestGetRulesUtil(t *testing.T) {
_, err := serviceUtil.GetRulesUtil(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err != nil {
- fmt.Printf("GetRulesUtil WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetRulesUtil WithCacheOnly failed")
}
_, err = serviceUtil.GetRulesUtil(context.Background(), "", "")
if err == nil {
- fmt.Printf("GetRulesUtil failed")
- t.FailNow()
+ t.Fatalf("GetRulesUtil failed")
}
_, err = serviceUtil.GetOneRule(util.SetContext(context.Background(), "cacheOnly", "1"), "", "", "")
if err != nil {
- fmt.Printf("GetOneRule WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetOneRule WithCacheOnly failed")
}
_, err = serviceUtil.GetOneRule(context.Background(), "", "", "")
if err == nil {
- fmt.Printf("GetOneRule failed")
- t.FailNow()
+ t.Fatalf("GetOneRule failed")
}
}
func TestRuleExist(t *testing.T) {
defer func() {
if r := recover(); r != nil {
- fmt.Printf("TestRuleExist panic")
- t.FailNow()
+ t.Fatalf("TestRuleExist panic")
}
}()
serviceUtil.RuleExist(util.SetContext(context.Background(), "cacheOnly", "1"), "", "", "", "")
@@ -79,14 +72,12 @@ func TestRuleExist(t *testing.T) {
func TestGetServiceRuleType(t *testing.T) {
_, _, err := serviceUtil.GetServiceRuleType(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err != nil {
- fmt.Printf("GetServiceRuleType WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetServiceRuleType WithCacheOnly failed")
}
_, _, err = serviceUtil.GetServiceRuleType(context.Background(), "", "")
if err == nil {
- fmt.Printf("GetServiceRuleType failed")
- t.FailNow()
+ t.Fatalf("GetServiceRuleType failed")
}
}
@@ -97,8 +88,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "a",
})
if err != nil {
- fmt.Printf("AllowAcrossApp with the same appId and no property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the same appId and no property failed")
}
err = serviceUtil.AllowAcrossDimension(context.Background(), &proto.MicroService{
@@ -107,8 +97,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "c",
})
if err == nil {
- fmt.Printf("AllowAcrossApp with the diff appId and no property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the diff appId and no property failed")
}
err = serviceUtil.AllowAcrossDimension(context.Background(), &proto.MicroService{
@@ -120,8 +109,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "a",
})
if err != nil {
- fmt.Printf("AllowAcrossApp with the same appId and allow property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the same appId and allow property failed")
}
err = serviceUtil.AllowAcrossDimension(context.Background(), &proto.MicroService{
@@ -133,8 +121,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "b",
})
if err != nil {
- fmt.Printf("AllowAcrossApp with the diff appId and allow property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the diff appId and allow property failed")
}
err = serviceUtil.AllowAcrossDimension(context.Background(), &proto.MicroService{
@@ -146,8 +133,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "b",
})
if err == nil {
- fmt.Printf("AllowAcrossApp with the diff appId and deny property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the diff appId and deny property failed")
}
err = serviceUtil.AllowAcrossDimension(context.Background(), &proto.MicroService{
@@ -159,8 +145,7 @@ func TestAllowAcrossApp(t *testing.T) {
AppId: "b",
})
if err == nil {
- fmt.Printf("AllowAcrossApp with the diff appId and empty property failed")
- t.FailNow()
+ t.Fatalf("AllowAcrossApp with the diff appId and empty property failed")
}
}
@@ -173,8 +158,7 @@ func TestMatchRules(t *testing.T) {
},
}, nil, nil)
if err == nil {
- fmt.Printf("MatchRules nil failed")
- t.FailNow()
+ t.Fatalf("MatchRules nil failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -185,8 +169,7 @@ func TestMatchRules(t *testing.T) {
},
}, &proto.MicroService{}, nil)
if err == nil {
- fmt.Printf("MatchRules invalid WHITE failed")
- t.FailNow()
+ t.Fatalf("MatchRules invalid WHITE failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -199,8 +182,7 @@ func TestMatchRules(t *testing.T) {
ServiceName: "a",
}, nil)
if err != nil {
- fmt.Printf("MatchRules WHITE with field ServiceName failed")
- t.FailNow()
+ t.Fatalf("MatchRules WHITE with field ServiceName failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -213,8 +195,7 @@ func TestMatchRules(t *testing.T) {
"a": "b",
})
if err != nil {
- fmt.Printf("MatchRules WHITE with tag b failed")
- t.FailNow()
+ t.Fatalf("MatchRules WHITE with tag b failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -227,8 +208,7 @@ func TestMatchRules(t *testing.T) {
"a": "c",
})
if err == nil {
- fmt.Printf("MatchRules WHITE with tag c failed")
- t.FailNow()
+ t.Fatalf("MatchRules WHITE with tag c failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -241,8 +221,7 @@ func TestMatchRules(t *testing.T) {
"a": "b",
})
if err == nil {
- fmt.Printf("MatchRules BLACK with tag b failed")
- t.FailNow()
+ t.Fatalf("MatchRules BLACK with tag b failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -255,8 +234,7 @@ func TestMatchRules(t *testing.T) {
ServiceName: "a",
}, nil)
if err == nil {
- fmt.Printf("MatchRules BLACK with field ServiceName failed")
- t.FailNow()
+ t.Fatalf("MatchRules BLACK with field ServiceName failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -269,8 +247,7 @@ func TestMatchRules(t *testing.T) {
"a": "c",
})
if err != nil {
- fmt.Printf("MatchRules BLACK with tag c failed")
- t.FailNow()
+ t.Fatalf("MatchRules BLACK with tag c failed")
}
err = serviceUtil.MatchRules([]*proto.ServiceRule{
@@ -283,24 +260,21 @@ func TestMatchRules(t *testing.T) {
"b": "b",
})
if err != nil {
- fmt.Printf("MatchRules with not exist tag failed")
- t.FailNow()
+ t.Fatalf("MatchRules with not exist tag failed")
}
}
func TestGetConsumer(t *testing.T) {
_, _, err := serviceUtil.GetConsumerIdsByProvider(context.Background(), "", &proto.MicroService{})
if err == nil {
- fmt.Printf("GetConsumerIdsByProvider invalid failed")
- t.FailNow()
+ t.Fatalf("GetConsumerIdsByProvider invalid failed")
}
_, _, err = serviceUtil.GetConsumerIdsByProvider(context.Background(), "", &proto.MicroService{
ServiceId: "a",
})
if err == nil {
- fmt.Printf("GetConsumerIdsByProvider not exist service failed")
- t.FailNow()
+ t.Fatalf("GetConsumerIdsByProvider not exist service failed")
}
_, err = serviceUtil.GetConsumersInCache(util.SetContext(context.Background(), "cacheOnly", "1"), "",
@@ -308,8 +282,7 @@ func TestGetConsumer(t *testing.T) {
ServiceId: "a",
})
if err != nil {
- fmt.Printf("GetConsumersInCache WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetConsumersInCache WithCacheOnly failed")
}
}
@@ -319,34 +292,29 @@ func TestGetProvider(t *testing.T) {
ServiceId: "a",
})
if err != nil {
- fmt.Printf("GetProvidersInCache WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetProvidersInCache WithCacheOnly failed")
}
_, _, err = serviceUtil.GetProviderIdsByConsumer(context.Background(), "", &proto.MicroService{})
if err == nil {
- fmt.Printf("GetProviderIdsByConsumer invalid failed")
- t.FailNow()
+ t.Fatalf("GetProviderIdsByConsumer invalid failed")
}
_, _, err = serviceUtil.GetProviderIdsByConsumer(util.SetContext(context.Background(), "cacheOnly", "1"),
"", &proto.MicroService{})
if err != nil {
- fmt.Printf("GetProviderIdsByConsumer WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetProviderIdsByConsumer WithCacheOnly failed")
}
}
func TestAccessible(t *testing.T) {
err := serviceUtil.Accessible(context.Background(), "", "")
if err.StatusCode() != http.StatusInternalServerError {
- fmt.Printf("Accessible invalid failed")
- t.FailNow()
+ t.Fatalf("Accessible invalid failed")
}
err = serviceUtil.Accessible(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err.StatusCode() == http.StatusInternalServerError {
- fmt.Printf("Accessible WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("Accessible WithCacheOnly failed")
}
}
diff --git a/server/service/util/tag_util_test.go b/server/service/util/tag_util_test.go
index 0f13648..935d4c2 100644
--- a/server/service/util/tag_util_test.go
+++ b/server/service/util/tag_util_test.go
@@ -17,7 +17,6 @@
package util_test
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
"golang.org/x/net/context"
@@ -27,21 +26,18 @@ import (
func TestAddTagIntoETCD(t *testing.T) {
err := serviceUtil.AddTagIntoETCD(context.Background(), "", "", map[string]string{"a": "1"})
if err == nil {
- fmt.Printf(`AddTagIntoETCD with {"a": "1"} tags failed`)
- t.FailNow()
+ t.Fatalf(`AddTagIntoETCD with {"a": "1"} tags failed`)
}
}
func TestGetTagsUtils(t *testing.T) {
_, err := serviceUtil.GetTagsUtils(util.SetContext(context.Background(), "cacheOnly", "1"), "", "")
if err != nil {
- fmt.Printf(`GetTagsUtils WithCacheOnly failed`)
- t.FailNow()
+ t.Fatalf(`GetTagsUtils WithCacheOnly failed`)
}
_, err = serviceUtil.GetTagsUtils(context.Background(), "", "")
if err == nil {
- fmt.Printf(`GetTagsUtils failed`)
- t.FailNow()
+ t.Fatalf(`GetTagsUtils failed`)
}
}
diff --git a/server/service/util/util_suite_test.go b/server/service/util/util_suite_test.go
index 7cbb461..c1dffa6 100644
--- a/server/service/util/util_suite_test.go
+++ b/server/service/util/util_suite_test.go
@@ -159,36 +159,31 @@ func TestSetDefault(t *testing.T) {
serviceUtil.SetServiceDefaultValue(service)
if len(service.Level) == 0 ||
len(service.Status) == 0 {
- fmt.Printf(`TestSetDefault failed`)
- t.FailNow()
+ t.Fatalf(`TestSetDefault failed`)
}
}
func TestGetOneDomainProjectServiceCount(t *testing.T) {
_, err := serviceUtil.GetOneDomainProjectServiceCount(util.SetContext(context.Background(), "cacheOnly", "1"), "")
if err != nil {
- fmt.Printf("GetOneDomainProjectServiceCount WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetOneDomainProjectServiceCount WithCacheOnly failed")
}
_, err = serviceUtil.GetOneDomainProjectServiceCount(context.Background(), "")
if err == nil {
- fmt.Printf("GetOneDomainProjectServiceCount failed")
- t.FailNow()
+ t.Fatalf("GetOneDomainProjectServiceCount failed")
}
}
func TestGetOneDomainProjectInstanceCount(t *testing.T) {
_, err := serviceUtil.GetOneDomainProjectInstanceCount(util.SetContext(context.Background(), "cacheOnly", "1"), "")
if err != nil {
- fmt.Printf("GetOneDomainProjectInstanceCount WithCacheOnly failed")
- t.FailNow()
+ t.Fatalf("GetOneDomainProjectInstanceCount WithCacheOnly failed")
}
_, err = serviceUtil.GetOneDomainProjectInstanceCount(context.Background(), "")
if err == nil {
- fmt.Printf("GetOneDomainProjectInstanceCount failed")
- t.FailNow()
+ t.Fatalf("GetOneDomainProjectInstanceCount failed")
}
}
--
To stop receiving notification emails like this one, please contact
littlecui@apache.org.