You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/08/13 05:07:37 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-792 More abundant metrics information (#413)
This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 447a47a SCB-792 More abundant metrics information (#413)
447a47a is described below
commit 447a47ad72afbcca2c175701bd8d57cb068b734d
Author: little-cui <su...@qq.com>
AuthorDate: Mon Aug 13 13:07:35 2018 +0800
SCB-792 More abundant metrics information (#413)
* SCB-792 More abundant metrics information
* Fix UT failure
---
server/metric/metrics.go => pkg/buffer/pool.go | 49 ++++++-------
.../metric/metrics.go => pkg/buffer/pool_test.go | 61 ++++++++--------
pkg/rest/route.go | 2 +-
pkg/util/concurrent_map.go | 64 ++++++++---------
pkg/util/concurrent_map_go19.go | 81 ++++++++++++++++++++++
pkg/util/concurrent_map_test.go | 69 ++++++++++++------
pkg/util/reflect.go | 2 +-
server/metric/calculator.go | 59 +++++++++++-----
server/metric/calculator_test.go | 29 ++++----
server/metric/common.go | 1 +
server/metric/gatherer.go | 4 +-
server/metric/metrics.go | 79 +++++++++++++++++----
server/metric/metrics_test.go | 65 +++++++++++++++++
server/plugin/infra/registry/etcd/etcd.go | 14 ++--
server/plugin/infra/registry/etcd/etcd_test.go | 34 ++++-----
server/rest/metrics.go | 2 +-
16 files changed, 434 insertions(+), 181 deletions(-)
diff --git a/server/metric/metrics.go b/pkg/buffer/pool.go
similarity index 50%
copy from server/metric/metrics.go
copy to pkg/buffer/pool.go
index 0afa93c..66d7ecd 100644
--- a/server/metric/metrics.go
+++ b/pkg/buffer/pool.go
@@ -14,41 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package metric
+package buffer
-import "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+import (
+ "bytes"
+ "sync"
+)
-func NewMetrics() *Metrics {
- return &Metrics{
- ConcurrentMap: util.NewConcurrentMap(0),
- }
+type Pool struct {
+ pool sync.Pool
}
-type Metrics struct {
- *util.ConcurrentMap
+func (p *Pool) Get() *bytes.Buffer {
+ return p.pool.Get().(*bytes.Buffer)
}
-func (cm *Metrics) Put(key string, val float64) (old float64) {
- old, _ = cm.ConcurrentMap.Put(key, val).(float64)
- return
+func (p *Pool) Put(buf *bytes.Buffer) {
+ buf.Reset()
+ p.pool.Put(buf)
}
-func (cm *Metrics) Get(key string) (val float64) {
- if v, ok := cm.ConcurrentMap.Get(key); ok {
- val, _ = v.(float64)
+func NewPool(s int) *Pool {
+ return &Pool{
+ pool: sync.Pool{
+ New: func() interface{} {
+ b := bytes.NewBuffer(make([]byte, s))
+ b.Reset()
+ return b
+ },
+ },
}
- return
-}
-
-func (cm *Metrics) Remove(key string) (old float64) {
- old, _ = cm.ConcurrentMap.Remove(key).(float64)
- return
-}
-
-func (cm *Metrics) ForEach(f func(k string, v float64) (next bool)) {
- cm.ConcurrentMap.ForEach(func(item util.MapItem) (next bool) {
- k, _ := item.Key.(string)
- v, _ := item.Value.(float64)
- return f(k, v)
- })
}
diff --git a/server/metric/metrics.go b/pkg/buffer/pool_test.go
similarity index 50%
copy from server/metric/metrics.go
copy to pkg/buffer/pool_test.go
index 0afa93c..959b259 100644
--- a/server/metric/metrics.go
+++ b/pkg/buffer/pool_test.go
@@ -14,41 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package metric
+package buffer
-import "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+import (
+ "strings"
+ "testing"
+)
-func NewMetrics() *Metrics {
- return &Metrics{
- ConcurrentMap: util.NewConcurrentMap(0),
+func TestNewPool(t *testing.T) {
+ p := NewPool(10)
+ b := p.Get()
+ if b == nil {
+ t.Fatalf("TestNewPool falied")
}
-}
-
-type Metrics struct {
- *util.ConcurrentMap
-}
-
-func (cm *Metrics) Put(key string, val float64) (old float64) {
- old, _ = cm.ConcurrentMap.Put(key, val).(float64)
- return
-}
-
-func (cm *Metrics) Get(key string) (val float64) {
- if v, ok := cm.ConcurrentMap.Get(key); ok {
- val, _ = v.(float64)
+ b.WriteString("a")
+ if b.String() != "a" {
+ t.Fatalf("TestNewPool falied")
+ }
+ p.Put(b)
+ b = p.Get()
+ if b == nil || b.Len() != 0 {
+ t.Fatalf("TestNewPool falied")
}
- return
-}
-
-func (cm *Metrics) Remove(key string) (old float64) {
- old, _ = cm.ConcurrentMap.Remove(key).(float64)
- return
}
-func (cm *Metrics) ForEach(f func(k string, v float64) (next bool)) {
- cm.ConcurrentMap.ForEach(func(item util.MapItem) (next bool) {
- k, _ := item.Key.(string)
- v, _ := item.Value.(float64)
- return f(k, v)
+func BenchmarkNewPool(b *testing.B) {
+ p := NewPool(4 * 10)
+ s := strings.Repeat("a", 4*1024)
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ buf := p.Get()
+ buf.WriteString(s)
+ _ = buf.String()
+ p.Put(buf)
+ }
})
+ b.ReportAllocs()
+ // 2000000 872 ns/op 4098 B/op 1 allocs/op
}
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 7e2088c..084fe44 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -71,7 +71,7 @@ func (this *ROAServerHandler) addRoute(route *Route) (err error) {
this.handlers[method] = append(this.handlers[method], &urlPatternHandler{
util.FormatFuncName(util.FuncName(route.Func)), route.Path, http.HandlerFunc(route.Func)})
- util.Logger().Infof("register route %s(%s).", route.Path, method)
+ util.Logger().Infof("register route %s(%s)", route.Path, method)
return nil
}
diff --git a/pkg/util/concurrent_map.go b/pkg/util/concurrent_map.go
index fce2c3c..6c6a484 100644
--- a/pkg/util/concurrent_map.go
+++ b/pkg/util/concurrent_map.go
@@ -1,19 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !go1.9
+
package util
import "sync"
@@ -38,10 +39,10 @@ func (cm *ConcurrentMap) init() {
cm.once.Do(cm.resize)
}
-func (cm *ConcurrentMap) Put(key, val interface{}) (old interface{}) {
+func (cm *ConcurrentMap) Put(key, val interface{}) {
cm.init()
cm.mux.Lock()
- old, cm.items[key] = cm.items[key], val
+ cm.items[key] = val
cm.mux.Unlock()
return
}
@@ -85,14 +86,10 @@ func (cm *ConcurrentMap) Get(key interface{}) (val interface{}, b bool) {
return
}
-func (cm *ConcurrentMap) Remove(key interface{}) (old interface{}) {
- var b bool
+func (cm *ConcurrentMap) Remove(key interface{}) {
cm.init()
cm.mux.Lock()
- old, b = cm.items[key]
- if b {
- delete(cm.items, key)
- }
+ delete(cm.items, key)
cm.mux.Unlock()
return
}
@@ -115,25 +112,22 @@ func (cm *ConcurrentMap) ForEach(f func(item MapItem) (next bool)) {
return
}
// avoid dead lock in function 'f'
- ch := make(chan MapItem, s)
+ ch := make([]MapItem, 0, s)
for k, v := range cm.items {
- ch <- MapItem{k, v}
+ ch = append(ch, MapItem{k, v})
}
cm.mux.RUnlock()
- for {
- select {
- case item := <-ch:
- if b := f(item); b {
- continue
- }
- default:
+ for _, i := range ch {
+ if b := f(i); b {
+ continue
}
break
}
- close(ch)
}
func NewConcurrentMap(size int) *ConcurrentMap {
- return &ConcurrentMap{size: size}
+ c := &ConcurrentMap{size: size}
+ c.init()
+ return c
}
diff --git a/pkg/util/concurrent_map_go19.go b/pkg/util/concurrent_map_go19.go
new file mode 100644
index 0000000..76a7d22
--- /dev/null
+++ b/pkg/util/concurrent_map_go19.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build go1.9
+
+package util
+
+import "sync"
+
+type MapItem struct {
+ Key interface{}
+ Value interface{}
+}
+
+type ConcurrentMap struct {
+ mapper sync.Map
+}
+
+func (cm *ConcurrentMap) Put(key, val interface{}) {
+ cm.mapper.Store(key, val)
+ return
+}
+
+func (cm *ConcurrentMap) PutIfAbsent(key, val interface{}) (exist interface{}) {
+ exist, _ = cm.mapper.LoadOrStore(key, val)
+ return
+}
+
+func (cm *ConcurrentMap) Fetch(key interface{}, f func() (interface{}, error)) (exist interface{}, err error) {
+ if exist, b := cm.mapper.Load(key); b {
+ return exist, nil
+ }
+ v, err := f()
+ if err != nil {
+ return nil, err
+ }
+ exist, _ = cm.mapper.LoadOrStore(key, v)
+ return
+}
+
+func (cm *ConcurrentMap) Get(key interface{}) (val interface{}, b bool) {
+ return cm.mapper.Load(key)
+}
+
+func (cm *ConcurrentMap) Remove(key interface{}) {
+ cm.mapper.Delete(key)
+}
+
+func (cm *ConcurrentMap) Clear() {
+ cm.mapper = sync.Map{}
+}
+
+func (cm *ConcurrentMap) Size() (s int) {
+ cm.mapper.Range(func(_, _ interface{}) bool {
+ s++
+ return true
+ })
+ return
+}
+
+func (cm *ConcurrentMap) ForEach(f func(item MapItem) (next bool)) {
+ cm.mapper.Range(func(key, value interface{}) bool {
+ return f(MapItem{key, value})
+ })
+}
+
+func NewConcurrentMap(_ int) *ConcurrentMap {
+ return new(ConcurrentMap)
+}
diff --git a/pkg/util/concurrent_map_test.go b/pkg/util/concurrent_map_test.go
index 9259d62..f4596fb 100644
--- a/pkg/util/concurrent_map_test.go
+++ b/pkg/util/concurrent_map_test.go
@@ -17,6 +17,7 @@
package util
import (
+ "errors"
"testing"
)
@@ -30,16 +31,14 @@ func TestConcurrentMap(t *testing.T) {
if b || v != nil {
t.Fatalf("TestConcurrentMap Get a not exist item failed.")
}
- v = cm.Put("a", "1")
- if v != nil {
- t.Fatalf("TestConcurrentMap Put a new item failed.")
- }
+ cm.Put("a", "1")
v, b = cm.Get("a")
if !b || v.(string) != "1" {
t.Fatalf("TestConcurrentMap Get an exist item failed.")
}
- v = cm.Put("a", "2")
- if v.(string) != "1" {
+ cm.Put("a", "2")
+ v, b = cm.Get("a")
+ if v.(string) != "2" {
t.Fatalf("TestConcurrentMap Put an item again failed.")
}
cm.PutIfAbsent("b", "1")
@@ -48,10 +47,7 @@ func TestConcurrentMap(t *testing.T) {
if !b || v.(string) != "2" {
t.Fatalf("TestConcurrentMap Get an item after PutIfAbsent failed.")
}
- v = cm.Remove("a")
- if v.(string) != "2" {
- t.Fatalf("TestConcurrentMap Remove an item failed.")
- }
+ cm.Remove("a")
v, b = cm.Get("a")
if b || v != nil {
t.Fatalf("TestConcurrentMap Get an item after Remove failed.")
@@ -90,10 +86,25 @@ func TestConcurrentMap_ForEach(t *testing.T) {
}
}
-func TestNewConcurrentMap(t *testing.T) {
- cm := NewConcurrentMap(100)
- if cm.size != 100 {
- t.Fatalf("TestNewConcurrentMap failed.")
+func TestConcurrentMap_Fetch(t *testing.T) {
+ cm := ConcurrentMap{}
+ v, err := cm.Fetch("a", func() (interface{}, error) {
+ return "a", nil
+ })
+ if err != nil || v != "a" {
+ t.Fatalf("TestConcurrentMap_Fetch failed.")
+ }
+ v, err = cm.Fetch("a", func() (interface{}, error) {
+ return "b", nil
+ })
+ if err != nil || v != "a" {
+ t.Fatalf("TestConcurrentMap_Fetch failed.")
+ }
+ v, err = cm.Fetch("b", func() (interface{}, error) {
+ return nil, errors.New("err")
+ })
+ if err == nil || v != nil {
+ t.Fatalf("TestConcurrentMap_Fetch failed.")
}
}
@@ -108,7 +119,8 @@ func BenchmarkConcurrentMap_Get(b *testing.B) {
}
})
b.ReportAllocs()
- // 20000000 88.7 ns/op 0 B/op 0 allocs/op
+ // go1.9- 20000000 95.8 ns/op 0 B/op 0 allocs/op
+ // go1.9+ 50000000 30.2 ns/op 0 B/op 0 allocs/op
}
func BenchmarkConcurrentMap_Put(b *testing.B) {
@@ -120,21 +132,22 @@ func BenchmarkConcurrentMap_Put(b *testing.B) {
}
})
b.ReportAllocs()
- // 3000000 420 ns/op 32 B/op 2 allocs/op
+ // go1.9- 3000000 424 ns/op 32 B/op 2 allocs/op
+ // go1.9+ 5000000 333 ns/op 16 B/op 1 allocs/op
}
func BenchmarkConcurrentMap_PutAndGet(b *testing.B) {
- var v interface{}
cm := &ConcurrentMap{}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cm.Put("a", "1")
- v, _ = cm.Get("a")
+ _, _ = cm.Get("a")
}
})
b.ReportAllocs()
- // 3000000 560 ns/op 32 B/op 2 allocs/op
+ // go1.9- 5000000 300 ns/op 32 B/op 2 allocs/op
+ // go1.9+ 10000000 150 ns/op 16 B/op 1 allocs/op
}
func BenchmarkConcurrentMap_ForEach(b *testing.B) {
@@ -151,5 +164,21 @@ func BenchmarkConcurrentMap_ForEach(b *testing.B) {
}
})
b.ReportAllocs()
- // 500000 3148 ns/op 3296 B/op 2 allocs/op
+ // go1.9- 1000000 1096 ns/op 3200 B/op 1 allocs/op
+ // go1.9+ 3000000 394 ns/op 0 B/op 0 allocs/op
+}
+
+func BenchmarkConcurrentMap_Fetch(b *testing.B) {
+ cm := ConcurrentMap{}
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ _, _ = cm.Fetch("a", func() (interface{}, error) {
+ return "a", nil
+ })
+ }
+ })
+ b.ReportAllocs()
+ // go1.9- 20000000 101 ns/op 16 B/op 1 allocs/op
+ // go1.9+ 200000000 8.63 ns/op 0 B/op 0 allocs/op
}
diff --git a/pkg/util/reflect.go b/pkg/util/reflect.go
index 73a5466..7ea324d 100644
--- a/pkg/util/reflect.go
+++ b/pkg/util/reflect.go
@@ -211,7 +211,7 @@ func FormatFuncName(f string) string {
}
_, fun := f[:i+j+1], f[i+j+2:]
i = strings.LastIndex(fun, ".")
- return fun[i+1:]
+ return strings.TrimSuffix(fun[i+1:], "-fm") // trim the suffix of function closure name
}
func FuncName(f interface{}) string {
diff --git a/server/metric/calculator.go b/server/metric/calculator.go
index 99fac21..523f7f5 100644
--- a/server/metric/calculator.go
+++ b/server/metric/calculator.go
@@ -25,39 +25,47 @@ var (
)
type Calculator interface {
- Calc(mf *dto.MetricFamily) float64
+ Calc(mf *dto.MetricFamily) *Details
}
type CommonCalculator struct {
}
// Get value of metricFamily
-func (c *CommonCalculator) Calc(mf *dto.MetricFamily) float64 {
+func (c *CommonCalculator) Calc(mf *dto.MetricFamily) *Details {
if len(mf.GetMetric()) == 0 {
- return 0
+ return nil
}
+ details := NewDetails()
switch mf.GetType() {
case dto.MetricType_GAUGE:
- return mf.GetMetric()[0].GetGauge().GetValue()
+ metricGaugeOf(details, mf.GetMetric())
case dto.MetricType_COUNTER:
- return metricCounterOf(mf.GetMetric())
+ metricCounterOf(details, mf.GetMetric())
case dto.MetricType_SUMMARY:
- return metricSummaryOf(mf.GetMetric())
- default:
- return 0
+ metricSummaryOf(details, mf.GetMetric())
+ case dto.MetricType_HISTOGRAM:
+ metricHistogramOf(details, mf.GetMetric())
}
+ return details
}
-func metricCounterOf(m []*dto.Metric) float64 {
- var sum float64 = 0
+func metricGaugeOf(details *Details, m []*dto.Metric) {
for _, d := range m {
- sum += d.GetCounter().GetValue()
+ details.Value += d.GetGauge().GetValue()
+ details.Put(d.GetLabel(), d.GetGauge().GetValue())
}
- return sum
}
-func metricSummaryOf(m []*dto.Metric) float64 {
+func metricCounterOf(details *Details, m []*dto.Metric) {
+ for _, d := range m {
+ details.Value += d.GetCounter().GetValue()
+ details.Put(d.GetLabel(), d.GetCounter().GetValue())
+ }
+}
+
+func metricSummaryOf(details *Details, m []*dto.Metric) {
var (
count uint64 = 0
sum float64 = 0
@@ -65,19 +73,38 @@ func metricSummaryOf(m []*dto.Metric) float64 {
for _, d := range m {
count += d.GetSummary().GetSampleCount()
sum += d.GetSummary().GetSampleSum()
+ details.Put(d.GetLabel(), d.GetSummary().GetSampleSum()/float64(d.GetSummary().GetSampleCount()))
+ }
+
+ if count == 0 {
+ return
+ }
+
+ details.Value = sum / float64(count)
+}
+
+func metricHistogramOf(details *Details, m []*dto.Metric) {
+ var (
+ count uint64 = 0
+ sum float64 = 0
+ )
+ for _, d := range m {
+ count += d.GetHistogram().GetSampleCount()
+ sum += d.GetHistogram().GetSampleSum()
+ details.Put(d.GetLabel(), d.GetHistogram().GetSampleSum()/float64(d.GetHistogram().GetSampleCount()))
}
if count == 0 {
- return 0
+ return
}
- return sum / float64(count)
+ details.Value = sum / float64(count)
}
func RegisterCalculator(c Calculator) {
DefaultCalculator = c
}
-func Calculate(mf *dto.MetricFamily) float64 {
+func Calculate(mf *dto.MetricFamily) *Details {
return DefaultCalculator.Calc(mf)
}
diff --git a/server/metric/calculator_test.go b/server/metric/calculator_test.go
index 50bc42b..4634530 100644
--- a/server/metric/calculator_test.go
+++ b/server/metric/calculator_test.go
@@ -26,40 +26,45 @@ func TestCommonCalculator_Calc(t *testing.T) {
mf := &dto.MetricFamily{}
mt := dto.MetricType_UNTYPED
- v := float64(0)
+ v1 := float64(0)
+ v2 := float64(0)
n := uint64(0)
- if c.Calc(mf) != 0 {
+ if c.Calc(mf) != nil {
t.Fatalf("TestCommonCalculator_Calc failed")
}
mf = &dto.MetricFamily{Type: &mt, Metric: []*dto.Metric{{}}}
- if c.Calc(mf) != 0 {
+ if c.Calc(mf) == nil {
t.Fatalf("TestCommonCalculator_Calc failed")
}
mt = dto.MetricType_GAUGE
- v = 1
+ v1 = 1
+ v2 = 2
mf = &dto.MetricFamily{Type: &mt, Metric: []*dto.Metric{
- {Gauge: &dto.Gauge{Value: &v}}, {Gauge: &dto.Gauge{Value: &v}}}}
- if c.Calc(mf) != 1 {
+ {Gauge: &dto.Gauge{Value: &v1}}, {Gauge: &dto.Gauge{Value: &v2}}}}
+ details := c.Calc(mf)
+ if details.Value != 3 {
t.Fatalf("TestCommonCalculator_Calc failed")
}
mt = dto.MetricType_COUNTER
- v = 1
+ v1 = 1
mf = &dto.MetricFamily{Type: &mt, Metric: []*dto.Metric{
- {Counter: &dto.Counter{Value: &v}}, {Counter: &dto.Counter{Value: &v}}}}
- if c.Calc(mf) != 2 {
+ {Counter: &dto.Counter{Value: &v1}}, {Counter: &dto.Counter{Value: &v1}}}}
+ details = c.Calc(mf)
+ if details.Value != 2 {
t.Fatalf("TestCommonCalculator_Calc failed")
}
mt = dto.MetricType_SUMMARY
- v = 3
+ v1 = 3
n = 2
mf = &dto.MetricFamily{Type: &mt, Metric: []*dto.Metric{
- {Summary: &dto.Summary{SampleCount: &n, SampleSum: &v}}, {Summary: &dto.Summary{SampleCount: &n, SampleSum: &v}}}}
- if c.Calc(mf) != v/float64(n) {
+ {Summary: &dto.Summary{SampleCount: &n, SampleSum: &v1}}, {Summary: &dto.Summary{SampleCount: &n, SampleSum: &v1}}}}
+ details = c.Calc(mf)
+ if details.Value != v1/float64(n) {
t.Fatalf("TestCommonCalculator_Calc failed")
}
}
diff --git a/server/metric/common.go b/server/metric/common.go
index 16941e7..7b07c71 100644
--- a/server/metric/common.go
+++ b/server/metric/common.go
@@ -29,6 +29,7 @@ const (
defaultCollectPeriod = 30 * time.Second
FamilyName = "service_center"
familyNamePrefix = FamilyName + "_"
+ bufferSize = 1024
)
var (
diff --git a/server/metric/gatherer.go b/server/metric/gatherer.go
index 2b9a433..c3abb8b 100644
--- a/server/metric/gatherer.go
+++ b/server/metric/gatherer.go
@@ -85,7 +85,9 @@ func (mm *MetricsGatherer) Collect() error {
for _, mf := range mfs {
name := mf.GetName()
if _, ok := SysMetrics.Get(name); strings.Index(name, familyNamePrefix) == 0 || ok {
- mm.Records.Put(strings.TrimPrefix(name, familyNamePrefix), Calculate(mf))
+ if d := Calculate(mf); d != nil {
+ mm.Records.Put(strings.TrimPrefix(name, familyNamePrefix), d)
+ }
}
}
return nil
diff --git a/server/metric/metrics.go b/server/metric/metrics.go
index 0afa93c..d828cb9 100644
--- a/server/metric/metrics.go
+++ b/server/metric/metrics.go
@@ -16,39 +16,90 @@
*/
package metric
-import "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+import (
+ "github.com/apache/incubator-servicecomb-service-center/pkg/buffer"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ dto "github.com/prometheus/client_model/go"
+ "strings"
+)
func NewMetrics() *Metrics {
return &Metrics{
- ConcurrentMap: util.NewConcurrentMap(0),
+ mapper: util.NewConcurrentMap(0),
}
}
-type Metrics struct {
- *util.ConcurrentMap
+func NewDetails() *Details {
+ return &Details{
+ mapper: util.NewConcurrentMap(0),
+ buffer: buffer.NewPool(bufferSize),
+ }
}
-func (cm *Metrics) Put(key string, val float64) (old float64) {
- old, _ = cm.ConcurrentMap.Put(key, val).(float64)
- return
+type Details struct {
+ Value float64
+
+ mapper *util.ConcurrentMap
+ buffer *buffer.Pool
}
-func (cm *Metrics) Get(key string) (val float64) {
- if v, ok := cm.ConcurrentMap.Get(key); ok {
- val, _ = v.(float64)
+// to format 'N1=L1,N2=L2,N3=L3,...'
+func (cm *Details) toKey(labels []*dto.LabelPair) string {
+ b := cm.buffer.Get()
+ for _, label := range labels {
+ b.WriteString(label.GetName())
+ b.WriteRune('=')
+ b.WriteString(label.GetValue())
+ b.WriteRune(',')
+ }
+ key := b.String()
+ cm.buffer.Put(b)
+ return key
+}
+
+func (cm *Details) toLabels(key string) (p []*dto.LabelPair) {
+ pairs := strings.Split(key, ",")
+ pairs = pairs[:len(pairs)-1]
+ p = make([]*dto.LabelPair, 0, len(pairs))
+ for _, pair := range pairs {
+ kv := strings.Split(pair, "=")
+ p = append(p, &dto.LabelPair{Name: &kv[0], Value: &kv[1]})
}
return
}
-func (cm *Metrics) Remove(key string) (old float64) {
- old, _ = cm.ConcurrentMap.Remove(key).(float64)
+func (cm *Details) Put(labels []*dto.LabelPair, val float64) {
+ cm.mapper.Put(cm.toKey(labels), val)
return
}
-func (cm *Metrics) ForEach(f func(k string, v float64) (next bool)) {
- cm.ConcurrentMap.ForEach(func(item util.MapItem) (next bool) {
+func (cm *Details) ForEach(f func(labels []*dto.LabelPair, v float64) (next bool)) {
+ cm.mapper.ForEach(func(item util.MapItem) (next bool) {
k, _ := item.Key.(string)
v, _ := item.Value.(float64)
+ return f(cm.toLabels(k), v)
+ })
+}
+
+type Metrics struct {
+ mapper *util.ConcurrentMap
+}
+
+func (cm *Metrics) Put(key string, val *Details) {
+ cm.mapper.Put(key, val)
+}
+
+func (cm *Metrics) Get(key string) (val float64) {
+ if v, ok := cm.mapper.Get(key); ok {
+ val = v.(*Details).Value
+ }
+ return
+}
+
+func (cm *Metrics) ForEach(f func(k string, v *Details) (next bool)) {
+ cm.mapper.ForEach(func(item util.MapItem) (next bool) {
+ k, _ := item.Key.(string)
+ v, _ := item.Value.(*Details)
return f(k, v)
})
}
diff --git a/server/metric/metrics_test.go b/server/metric/metrics_test.go
new file mode 100644
index 0000000..1e4a11d
--- /dev/null
+++ b/server/metric/metrics_test.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package metric
+
+import (
+ dto "github.com/prometheus/client_model/go"
+ "testing"
+)
+
+func TestDetails_ForEach(t *testing.T) {
+ n, v := "name", "value"
+ d := NewDetails()
+ d.ForEach(func(labels []*dto.LabelPair, v float64) (next bool) {
+ t.Fatalf("TestDetails_ForEach failed")
+ return true
+ })
+ d.Put([]*dto.LabelPair{{Name: &n, Value: &v}, {Name: &n, Value: &v}}, 1)
+ d.ForEach(func(labels []*dto.LabelPair, v float64) (next bool) {
+ if len(labels) != 2 || v != 1 {
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+ return true
+ })
+ d.Put([]*dto.LabelPair{{}}, 3)
+ d.Put([]*dto.LabelPair{{}}, 4)
+ d.Put(nil, 2)
+ l := 0
+ d.ForEach(func(labels []*dto.LabelPair, v float64) (next bool) {
+ switch {
+ case len(labels) == 2:
+ if v != 1 {
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+ case len(labels) == 0:
+ if v != 2 {
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+ case len(labels) == 1:
+ if v != 4 {
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+ default:
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+ l++
+ return true
+ })
+ if l != 3 {
+ t.Fatalf("TestDetails_ForEach failed")
+ }
+}
diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go
index 3205f3f..1786a70 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -113,10 +113,6 @@ func (c *EtcdClient) newClient() (*clientv3.Client, error) {
if err != nil {
return nil, err
}
- if len(c.Endpoints) == 1 {
- ReportBackendInstance(1)
- return client, nil
- }
defer func() {
if err != nil {
@@ -129,6 +125,14 @@ func (c *EtcdClient) newClient() (*clientv3.Client, error) {
if err != nil {
return nil, err
}
+
+ ReportBackendInstance(len(resp.Members))
+
+ if len(c.Endpoints) == 1 {
+ // no need to check remote endpoints
+ return client, nil
+ }
+
epLoop:
for _, ep := range c.Endpoints {
var cluster []string
@@ -148,7 +152,7 @@ epLoop:
err = fmt.Errorf("the etcd cluster endpoint list%v does not contain %s", cluster, ep)
return nil, err
}
- ReportBackendInstance(len(resp.Members))
+
return client, nil
}
diff --git a/server/plugin/infra/registry/etcd/etcd_test.go b/server/plugin/infra/registry/etcd/etcd_test.go
index 6b49ece..067b713 100644
--- a/server/plugin/infra/registry/etcd/etcd_test.go
+++ b/server/plugin/infra/registry/etcd/etcd_test.go
@@ -34,11 +34,14 @@ import (
"time"
)
-const dialTimeout = 500 * time.Millisecond
+const (
+ dialTimeout = 500 * time.Millisecond
+ endpoint = "127.0.0.1:2379"
+)
func TestEtcdClient(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
@@ -54,8 +57,9 @@ func TestEtcdClient(t *testing.T) {
}
// base test
+ registry.RegistryConfig().ClusterAddresses = endpoint
inst := NewRegistry()
- if inst == nil || firstEndpoint != "http://127.0.0.1:2379" {
+ if inst == nil || firstEndpoint != "http://"+endpoint {
t.Fatalf("TestEtcdClient failed, %#v", firstEndpoint)
}
old1 := registry.RegistryConfig().ClusterAddresses
@@ -324,7 +328,7 @@ func TestEtcdClient(t *testing.T) {
func TestEtcdClient_Compact(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
@@ -345,7 +349,7 @@ func TestEtcdClient_Compact(t *testing.T) {
func TestEtcdClient_Txn(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
@@ -415,7 +419,7 @@ func TestEtcdClient_Txn(t *testing.T) {
func TestEtcdClient_LeaseRenew(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
@@ -452,7 +456,7 @@ func TestEtcdClient_LeaseRenew(t *testing.T) {
func TestEtcdClient_HealthCheck(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
AutoSyncInterval: time.Millisecond,
}
@@ -481,7 +485,7 @@ func TestEtcdClient_HealthCheck(t *testing.T) {
if err != nil {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
- etcd.Endpoints = []string{"127.0.0.1:2379"}
+ etcd.Endpoints = []string{endpoint}
etcd.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
@@ -508,7 +512,7 @@ func TestEtcdClient_HealthCheck(t *testing.T) {
func TestEtcdClient_Watch(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379"},
+ Endpoints: []string{endpoint},
DialTimeout: dialTimeout,
}
err := etcd.Initialize()
@@ -669,7 +673,7 @@ func TestEtcdClient_Watch(t *testing.T) {
func TestNewRegistry(t *testing.T) {
etcd := &EtcdClient{
- Endpoints: []string{"127.0.0.1:2379", "0.0.0.0:2379"},
+ Endpoints: []string{endpoint, "0.0.0.0:2379"},
DialTimeout: dialTimeout,
AutoSyncInterval: time.Millisecond,
}
@@ -710,14 +714,10 @@ func TestWithTLS(t *testing.T) {
}
err = etcd.Initialize()
- if err != nil {
- t.Fatalf("TestEtcdClient failed, %#v", err)
- }
- defer etcd.Close()
-
- ctx, _ := context.WithTimeout(context.Background(), dialTimeout)
- err = etcd.SyncMembers(ctx)
+ // initialize the etcd client will check member list firstly,
+ // so will raise an grpc error but not TLS errors.
if _, ok := status.FromError(err); !ok {
t.Fatalf("TestEtcdClient failed, %#v", err)
}
+ defer etcd.Close()
}
diff --git a/server/rest/metrics.go b/server/rest/metrics.go
index dfe3ae2..2aabd45 100644
--- a/server/rest/metrics.go
+++ b/server/rest/metrics.go
@@ -62,7 +62,7 @@ func init() {
func ReportRequestCompleted(w http.ResponseWriter, r *http.Request, start time.Time) {
instance := metric.InstanceName()
elapsed := float64(time.Since(start).Nanoseconds()) / float64(time.Microsecond)
- route, _ := r.Context().Value(rest.CTX_MATCH_PATTERN).(string)
+ route, _ := r.Context().Value(rest.CTX_MATCH_FUNC).(string)
if strings.Index(r.Method, "WATCH") != 0 {
reqDurations.WithLabelValues(r.Method, instance, route).Observe(elapsed)