You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/21 00:36:29 UTC
[servicecomb-service-center] branch v1.x updated: SCB-2176 Refactor
websocket (#986) (#989)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/v1.x by this push:
new b5e162d SCB-2176 Refactor websocket (#986) (#989)
b5e162d is described below
commit b5e162da02db641385ce45ee278842ab313d10ea
Author: little-cui <su...@qq.com>
AuthorDate: Fri May 21 08:36:18 2021 +0800
SCB-2176 Refactor websocket (#986) (#989)
* SCB-2176 Refactor websocket (#986)
* SCB-2176 Refactor event bus
* SCB-2176 Fix: UT failures
* SCB-2176 Refactor websocket (#980)
* SCB-2176 Refactor websocket
* SCB-2176 Add UTs
* SCB-2176 Fix: UT failures
* SCB-2176 Fix: websocket health check failed
* SCB-2176 Add benchmark for 10K connection
* SCB-2176 Add benchmark for 10K connection
* SCB-2176 Add event metrics
(cherry picked from commit cc0f684449f6606c45c86d53508428a50eb32b84)
* SCB-2176 Fix: export metrics APIs
* SCB-2176 Resolve comments
---
examples/infrastructures/docker/README.md | 2 +-
go.mod | 1 +
go.sum | 3 +
integration/health-metrics-grafana.json | 174 ++++++++++++++++--------
integration/instances_test.go | 127 ++++++++++++++++-
pkg/util/context.go | 14 +-
server/connection/metrics.go | 27 +++-
server/connection/ws/websocket.go | 24 +---
server/connection/ws/websocket_test.go | 2 +-
server/event/instance_subscriber.go | 44 +++---
server/metric/prometheus/metrics.go | 5 +
server/metric/prometheus/reporter.go | 5 +-
server/plugin/discovery/etcd/cacher_kv.go | 1 +
server/plugin/discovery/metrics.go | 34 ++++-
server/plugin/discovery/servicecenter/common.go | 30 ++--
server/service/event/instance_event_handler.go | 47 ++++---
server/service/util/microservice_util.go | 9 --
17 files changed, 404 insertions(+), 145 deletions(-)
diff --git a/examples/infrastructures/docker/README.md b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
## Quick Start
```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
docker-compose up
```
This will start up ServiceCenter listening on `:30100` for handling requests and Dashboard listening on `:30103`.
diff --git a/go.mod b/go.mod
index 7d7edd3..7a194dc 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // v4
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elithrar/simple-scrypt v1.3.0
+ github.com/go-chassis/cari v0.3.0
github.com/go-chassis/foundation v0.3.0
github.com/go-chassis/go-archaius v1.3.2
github.com/go-chassis/go-chassis v0.0.0-20200826064053-d90be848aa10
diff --git a/go.sum b/go.sum
index a04e695..4b9d25b 100644
--- a/go.sum
+++ b/go.sum
@@ -75,6 +75,7 @@ github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
@@ -109,6 +110,8 @@ github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ=
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
+github.com/go-chassis/cari v0.3.0 h1:ysEX1t9dBObshebFKca3zhrWFqyPvcIZo2r66IyJjuk=
+github.com/go-chassis/cari v0.3.0/go.mod h1:Ie2lW11Y5ZFClY9z7bhAwK6BoNxqGSf3fYGs4mPFs74=
github.com/go-chassis/foundation v0.1.1-0.20191113114104-2b05871e9ec4/go.mod h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
github.com/go-chassis/foundation v0.1.1-0.20200825060850-b16bf420f7b3/go.mod h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
github.com/go-chassis/foundation v0.3.0 h1:jG4BIrK8fXD9jbTtJ5rOLGQZ1pQI/mLnDuVJzToCtos=
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
{
"__inputs": [
{
@@ -2183,7 +2168,7 @@
"steppedLine": false,
"targets": [
{
- "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
+ "expr": "sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Publish Events",
+ "title": "Produce Events",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2277,7 +2262,7 @@
"steppedLine": false,
"targets": [
{
- "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+ "expr": "max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Publish Events Latency",
+ "title": "Produce Events Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2381,7 +2366,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribe Events",
+ "title": "Post Events",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2475,7 +2460,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribe Events Latency",
+ "title": "Post Events Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2526,8 +2511,91 @@
"x": 0,
"y": 40
},
+ "id": 48,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(service_center_notify_pending_total{job=\"service-center\"}) by (instance,source)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Pending Events",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_LOCAL}",
+ "fill": 1,
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 6,
+ "y": 40
+ },
"height": "",
- "id": 37,
+ "id": 43,
"legend": {
"alignAsTable": false,
"avg": false,
@@ -2558,18 +2626,17 @@
"steppedLine": false,
"targets": [
{
- "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
+ "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
"format": "time_series",
- "instant": false,
"intervalFactor": 1,
- "legendFormat": "{{instance}}> {{operation}}",
- "refId": "A"
+ "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+ "refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Backend Operations",
+ "title": "Subscribers",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2587,7 +2654,7 @@
"yaxes": [
{
"decimals": 0,
- "format": "ops",
+ "format": "none",
"label": null,
"logBase": 1,
"max": null,
@@ -2618,25 +2685,25 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 6,
+ "x": 12,
"y": 40
},
"height": "",
- "id": 38,
+ "id": 37,
"legend": {
"alignAsTable": false,
- "avg": true,
+ "avg": false,
"current": false,
"hideEmpty": true,
"hideZero": true,
- "max": true,
- "min": true,
- "rightSide": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
"show": false,
"sort": "avg",
"sortDesc": true,
"total": false,
- "values": true
+ "values": false
},
"lines": true,
"linewidth": 1,
@@ -2653,17 +2720,18 @@
"steppedLine": false,
"targets": [
{
- "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
+ "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
"format": "time_series",
+ "instant": false,
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{operation}}",
- "refId": "B"
+ "refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Backend Operations Latency",
+ "title": "Backend Operations",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2680,7 +2748,8 @@
},
"yaxes": [
{
- "format": "µs",
+ "decimals": 0,
+ "format": "ops",
"label": null,
"logBase": 1,
"max": null,
@@ -2711,25 +2780,25 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 12,
+ "x": 18,
"y": 40
},
"height": "",
- "id": 43,
+ "id": 38,
"legend": {
"alignAsTable": false,
- "avg": false,
+ "avg": true,
"current": false,
"hideEmpty": true,
"hideZero": true,
- "max": false,
- "min": false,
- "rightSide": false,
+ "max": true,
+ "min": true,
+ "rightSide": true,
"show": false,
"sort": "avg",
"sortDesc": true,
"total": false,
- "values": false
+ "values": true
},
"lines": true,
"linewidth": 1,
@@ -2746,17 +2815,17 @@
"steppedLine": false,
"targets": [
{
- "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
+ "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
"format": "time_series",
"intervalFactor": 1,
- "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+ "legendFormat": "{{instance}}> {{operation}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribers",
+ "title": "Backend Operations Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2773,8 +2842,7 @@
},
"yaxes": [
{
- "decimals": 0,
- "format": "none",
+ "format": "µs",
"label": null,
"logBase": 1,
"max": null,
@@ -3434,7 +3502,7 @@
"list": []
},
"time": {
- "from": "now-5m",
+ "from": "now-30m",
"to": "now"
},
"timepicker": {
@@ -3465,5 +3533,5 @@
"timezone": "",
"title": "ServiceCenter",
"uid": "Zg6NoHGiz",
- "version": 12
+ "version": 17
}
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index b84fbc4..fcc9108 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,14 +18,19 @@ package integrationtest_test
import (
"encoding/json"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "github.com/widuu/gojson"
+ "fmt"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
"net/http"
"strings"
+ "sync"
"bytes"
. "github.com/apache/servicecomb-service-center/integration"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "github.com/widuu/gojson"
"io/ioutil"
"math/rand"
"strconv"
@@ -691,3 +696,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
}
}
+
+func BenchmarkInstanceWatch(t *testing.B) {
+ scclient = insecurityConnection
+ var serviceId, instanceId string
+
+ t.Run("prepare data", func(t *testing.B) {
+ // service
+ serviceName := "testInstance" + strconv.Itoa(rand.Int())
+ servicemap := map[string]interface{}{
+ "serviceName": serviceName,
+ "appId": "testApp",
+ "version": "1.0",
+ }
+ bodyParams := map[string]interface{}{
+ "service": servicemap,
+ }
+ body, _ := json.Marshal(bodyParams)
+ bodyBuf := bytes.NewReader(body)
+ req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, err := scclient.Do(req)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
+ respbody, _ := ioutil.ReadAll(resp.Body)
+ serviceId = gojson.Json(string(respbody)).Get("serviceId").Tostring()
+ resp.Body.Close()
+
+ // instance
+ healthcheck := map[string]interface{}{
+ "mode": "push",
+ "interval": 30000,
+ "times": 20000,
+ }
+ instance := map[string]interface{}{
+ "hostName": "cse",
+ "healthCheck": healthcheck,
+ }
+ bodyParams = map[string]interface{}{
+ "instance": instance,
+ }
+ body, _ = json.Marshal(bodyParams)
+ bodyBuf = bytes.NewReader(body)
+ req, _ = http.NewRequest(POST, SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, err = scclient.Do(req)
+ assert.NoError(t, err)
+ respbody, _ = ioutil.ReadAll(resp.Body)
+ instanceId = gojson.Json(string(respbody)).Get("instanceId").Tostring()
+ resp.Body.Close()
+
+ req, _ = http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", nil)
+ req.Header.Set("X-Domain-Name", "default")
+ req.Header.Set("X-ConsumerId", serviceId)
+ resp, err = scclient.Do(req)
+ assert.NoError(t, err)
+ resp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
+ })
+
+ t.Run("test 10K connection", func(t *testing.B) {
+
+ const N, E = 2500, 2500
+ var okWg sync.WaitGroup
+ okWg.Add(N)
+
+ t.Run("new 10K connection", func(t *testing.B) {
+ url := strings.ReplaceAll(strings.ReplaceAll(SCURL, "http://", "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+ for i := 0; i < N; i++ {
+ go func() {
+ conn, _, err := websocket.DefaultDialer.Dial(url, nil)
+ assert.NoError(t, err)
+ for {
+ _ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+ _, data, err := conn.ReadMessage()
+ if err != nil {
+ okWg.Done()
+ return
+ }
+
+ var response discovery.WatchInstanceResponse
+ _ = json.Unmarshal(data, &response)
+ instance := response.Instance
+ timestamp, _ := strconv.ParseInt(instance.ModTimestamp, 10, 64)
+ sub := time.Now().Sub(time.Unix(timestamp, 0))
+ fmt.Println(instance.Properties["tag"], sub)
+ }
+ }()
+ }
+ <-time.After(10 * time.Second)
+ })
+
+ t.Run("fire 10K event", func(t *testing.B) {
+ for i := 0; i < E; i++ {
+ propertiesInstance := map[string]interface{}{
+ "tag": strconv.Itoa(i),
+ }
+ bodyParams := map[string]interface{}{
+ "properties": propertiesInstance,
+ }
+ url := strings.Replace(UPDATEINSTANCEMETADATA, ":serviceId", serviceId, 1)
+ url = strings.Replace(url, ":instanceId", instanceId, 1)
+ body, _ := json.Marshal(bodyParams)
+ bodyBuf := bytes.NewReader(body)
+ req, _ := http.NewRequest(UPDATE, SCURL+url, bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, _ := scclient.Do(req)
+ resp.Body.Close()
+ }
+ })
+
+ t.Run("wait", func(t *testing.B) {
+ okWg.Wait()
+ })
+ })
+}
diff --git a/pkg/util/context.go b/pkg/util/context.go
index d1d4ceb..2c97d3d 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
import (
"context"
"net/http"
+ "strings"
"time"
)
@@ -28,6 +29,7 @@ const (
CtxProject = "project"
CtxTargetDomain = "target-domain"
CtxTargetProject = "target-project"
+ SPLIT = "/"
)
type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key string, val interface{}) *http.Reque
}
func ParseDomainProject(ctx context.Context) string {
- return ParseDomain(ctx) + "/" + ParseProject(ctx)
+ return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
}
func ParseTargetDomainProject(ctx context.Context) string {
- return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+ return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
}
func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, project string) contex
return SetProject(SetDomain(ctx, domain), project)
}
+func SetDomainProjectString(ctx context.Context, domainProject string) context.Context {
+ arr := strings.Split(domainProject, SPLIT)
+ if len(arr) != 2 {
+ return ctx
+ }
+ return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context {
return SetTargetProject(SetTargetDomain(ctx, domain), project)
}
diff --git a/server/connection/metrics.go b/server/connection/metrics.go
index 76aa41e..a83b229 100644
--- a/server/connection/metrics.go
+++ b/server/connection/metrics.go
@@ -47,6 +47,23 @@ var (
Objectives: metric.Pxx,
}, []string{"instance", "source", "status"})
+ pendingGauge = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: metric.FamilyName,
+ Subsystem: "notify",
+ Name: "pending_total",
+ Help: "Counter of pending instance events",
+ }, []string{"instance", "source"})
+
+ pendingLatency = prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: metric.FamilyName,
+ Subsystem: "notify",
+ Name: "pending_durations_microseconds",
+ Help: "Latency of pending instance events",
+ Objectives: metric.Pxx,
+ }, []string{"instance", "source"})
+
subscriberGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metric.FamilyName,
@@ -57,7 +74,7 @@ var (
)
func init() {
- prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge)
+ prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge, pendingLatency, pendingGauge)
}
func ReportPublishCompleted(evt event.Event, err error) {
@@ -69,6 +86,14 @@ func ReportPublishCompleted(evt event.Event, err error) {
}
notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
+ pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
+}
+
+func ReportPendingCompleted(evt event.Event) {
+ instance := metric.InstanceName()
+ elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
+ pendingLatency.WithLabelValues(instance, evt.Type().String()).Observe(elapsed)
+ pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
}
func ReportSubscriber(domain, scheme string, n float64) {
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index 24a7e2b..380c9db 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,16 +19,19 @@ package ws
import (
"context"
- "fmt"
+ "errors"
+ "time"
+
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/connection"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"github.com/gorilla/websocket"
- "time"
)
const Websocket = "Websocket"
+var errServiceNotExist = errors.New("service does not exist")
+
type WebSocket struct {
Options
Conn *websocket.Conn
@@ -146,13 +149,11 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) error {
}
if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
- return fmt.Errorf("Service does not exist.")
+ return errServiceNotExist
}
remoteAddr := wh.Conn.RemoteAddr().String()
if err := wh.WritePingPong(websocket.PingMessage); err != nil {
- log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, consumer: %s",
- remoteAddr, wh.ConsumerID)
return err
}
@@ -162,18 +163,7 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) error {
}
func (wh *WebSocket) WritePingPong(messageType int) error {
- err := wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
- if err != nil {
- messageTypeName := "Ping"
- if messageType == websocket.PongMessage {
- messageTypeName = "Pong"
- }
- log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: %s",
- messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
- //wh.subscriber.SetError(err)
- return err
- }
- return nil
+ return wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
}
func (wh *WebSocket) WriteTextMessage(message []byte) error {
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index ca40bd6..7518b5a 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -159,6 +159,6 @@ func TestWebSocket_CheckHealth(t *testing.T) {
})
t.Run("should return err when consumer not exist", func(t *testing.T) {
ws := wss.NewWebSocket("", "", mock.ServerConn)
- assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+ assert.Equal(t, "service does not exist", ws.CheckHealth(context.Background()).Error())
})
}
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index cb1895f..f270fcb 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,12 +18,13 @@
package event
import (
+ "errors"
"github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/log"
- "time"
+ "github.com/apache/servicecomb-service-center/server/connection"
)
-const AddJobTimeout = 1 * time.Second
+var errBusy = errors.New("too busy")
type InstanceSubscriber struct {
event.Subscriber
@@ -47,40 +48,49 @@ func (w *InstanceSubscriber) OnAccept() {
}
//被通知
-func (w *InstanceSubscriber) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
if w.Err() != nil {
return
}
- wJob, ok := job.(*InstanceEvent)
+ wJob, ok := evt.(*InstanceEvent)
if !ok {
return
}
w.sendMessage(wJob)
}
-func (w *InstanceSubscriber) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
defer log.Recover()
+
+ connection.ReportPendingCompleted(evt)
+
select {
- case w.Job <- job:
+ case w.Job <- evt:
default:
- timer := time.NewTimer(w.Timeout())
- select {
- case w.Job <- job:
- timer.Stop()
- case <-timer.C:
- log.Errorf(nil,
- "the %s watcher %s %s event queue is full[over %s], drop the event %v",
- w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
- }
+ log.Errorf(nil, "the %s watcher %s %s event queue is full, drop the blocked events",
+ w.Type(), w.Group(), w.Subject())
+ w.cleanup()
+ w.Job <- evt
}
}
-func (w *InstanceSubscriber) Timeout() time.Duration {
- return AddJobTimeout
+func (w *InstanceSubscriber) cleanup() {
+ for {
+ select {
+ case evt, ok := <-w.Job:
+ if !ok {
+ return
+ }
+ connection.ReportPublishCompleted(evt, errBusy)
+ default:
+ return
+ }
+ }
}
func (w *InstanceSubscriber) Close() {
+ w.cleanup()
close(w.Job)
}
diff --git a/server/metric/prometheus/metrics.go b/server/metric/prometheus/metrics.go
index 2c2c5c7..e456927 100644
--- a/server/metric/prometheus/metrics.go
+++ b/server/metric/prometheus/metrics.go
@@ -21,8 +21,10 @@ import (
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/metric"
+ api "github.com/apache/servicecomb-service-center/server/rest"
"github.com/prometheus/client_golang/prometheus"
"net/http"
+ "os"
"strconv"
"strings"
"time"
@@ -65,6 +67,9 @@ var (
func init() {
prometheus.MustRegister(incomingRequests, successfulRequests, reqDurations, queryPerSeconds)
+ if "true" == os.Getenv("METRICS_ENABLE") {
+ api.RegisterServerHandler("/metrics", prometheus.Handler())
+ }
}
func ReportRequestCompleted(w http.ResponseWriter, r *http.Request, start time.Time) {
diff --git a/server/metric/prometheus/reporter.go b/server/metric/prometheus/reporter.go
index f089b90..32cc6df 100644
--- a/server/metric/prometheus/reporter.go
+++ b/server/metric/prometheus/reporter.go
@@ -18,7 +18,6 @@ package prometheus
import (
"github.com/apache/servicecomb-service-center/server/metric"
dto "github.com/prometheus/client_model/go"
- "os"
)
const (
@@ -65,9 +64,7 @@ func (r *APIReporter) toLabels(pairs []*dto.LabelPair) (labels []string) {
}
func init() {
- if "true" == os.Getenv("METRICS_ENABLE") {
- metric.RegisterReporter("rest", NewAPIReporter())
- }
+ metric.RegisterReporter("rest", NewAPIReporter())
}
func NewAPIReporter() *APIReporter {
diff --git a/server/plugin/discovery/etcd/cacher_kv.go b/server/plugin/discovery/etcd/cacher_kv.go
index a61d1e3..5a9beea 100644
--- a/server/plugin/discovery/etcd/cacher_kv.go
+++ b/server/plugin/discovery/etcd/cacher_kv.go
@@ -451,6 +451,7 @@ func (c *KvCacher) notify(evts []discovery.KvEvent) {
for _, evt := range evts {
c.Cfg.OnEvent(evt)
}
+ discovery.ReportDispatchEventCompleted(c.Cfg.Key, evts)
}
func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *discovery.KeyValue) {
diff --git a/server/plugin/discovery/metrics.go b/server/plugin/discovery/metrics.go
index 4cd41a2..ce434ca 100644
--- a/server/plugin/discovery/metrics.go
+++ b/server/plugin/discovery/metrics.go
@@ -39,10 +39,27 @@ var (
Help: "Latency of backend events processing",
Objectives: metric.Pxx,
}, []string{"instance", "prefix"})
+
+ dispatchCounter = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: metric.FamilyName,
+ Subsystem: "db",
+ Name: "dispatch_event_total",
+ Help: "Counter of backend events dispatch",
+ }, []string{"instance", "prefix"})
+
+ dispatchLatency = prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: metric.FamilyName,
+ Subsystem: "db",
+ Name: "dispatch_event_durations_microseconds",
+ Help: "Latency of backend events dispatch",
+ Objectives: metric.Pxx,
+ }, []string{"instance", "prefix"})
)
func init() {
- prometheus.MustRegister(eventsCounter, eventsLatency)
+ prometheus.MustRegister(eventsCounter, eventsLatency, dispatchCounter, dispatchLatency)
}
func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
}
eventsCounter.WithLabelValues(instance, prefix).Add(l)
+ dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+ l := float64(len(evts))
+ if l == 0 {
+ return
+ }
+ instance := metric.InstanceName()
+ now := time.Now()
+ for _, evt := range evts {
+ elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+ dispatchLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+ }
+ dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
}
diff --git a/server/plugin/discovery/servicecenter/common.go b/server/plugin/discovery/servicecenter/common.go
index d59957f..a107db1 100644
--- a/server/plugin/discovery/servicecenter/common.go
+++ b/server/plugin/discovery/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package servicecenter
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 0d6010d..46f9b3a 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -18,6 +18,9 @@ package event
import (
"context"
+ "fmt"
+ "strings"
+
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
@@ -29,7 +32,6 @@ import (
"github.com/apache/servicecomb-service-center/server/service/cache"
"github.com/apache/servicecomb-service-center/server/service/metrics"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
- "strings"
)
const (
@@ -57,12 +59,15 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
domainName := domainProject[:idx]
projectName := domainProject[idx+1:]
+ ctx := context.WithValue(context.WithValue(context.Background(),
+ util.CtxCacheOnly, "1"),
+ util.CtxGlobal, "1")
+
var count float64 = increaseOne
- switch action {
- case pb.EVT_INIT:
+ if action == pb.EVT_INIT {
metrics.ReportInstances(domainName, count)
- ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
- if ms == nil {
+ ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+ if err != nil {
log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
action, providerID, providerInstanceID, instance.Endpoints)
return
@@ -70,11 +75,10 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
frameworkName, frameworkVersion := getFramework(ms)
metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
return
- case pb.EVT_CREATE:
- metrics.ReportInstances(domainName, count)
- case pb.EVT_DELETE:
+ }
+
+ if action == pb.EVT_DELETE {
count = decreaseOne
- metrics.ReportInstances(domainName, count)
if !apt.IsDefaultDomainProject(domainProject) {
projectName := domainProject[idx+1:]
serviceUtil.RemandInstanceQuota(
@@ -82,25 +86,25 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
}
}
- if event.Center().Closed() {
- log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
- action, providerID, providerInstanceID, instance.Endpoints)
+ // 查询服务版本信息
+ ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+ if err != nil {
+ log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
+ action, providerID, providerInstanceID, instance.Endpoints), err)
return
}
- // 查询服务版本信息
- ctx := context.WithValue(context.WithValue(context.Background(),
- util.CtxCacheOnly, "1"),
- util.CtxGlobal, "1")
- ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
- if ms == nil {
- log.Errorf(err, "caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
+ if event.Center().Closed() {
+ log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
action, providerID, providerInstanceID, instance.Endpoints)
return
}
- frameworkName, frameworkVersion := getFramework(ms)
- metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+ if action != pb.EVT_UPDATE {
+ frameworkName, frameworkVersion := getFramework(ms)
+ metrics.ReportInstances(domainName, count)
+ metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+ }
log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, endpoints %v",
action, providerID, ms.Environment, ms.AppId, ms.ServiceName, ms.Version,
@@ -135,7 +139,6 @@ func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKe
Instance: evt.KV.Value.(*pb.MicroServiceInstance),
}
for _, consumerID := range subscribers {
- // TODO add超时怎么处理?
evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
err := event.Center().Fire(evt)
if err != nil {
diff --git a/server/service/util/microservice_util.go b/server/service/util/microservice_util.go
index c10f33f..987d60d 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -64,15 +64,6 @@ func GetService(ctx context.Context, domainProject string, serviceID string) (*p
return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
}
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) *pb.MicroService {
- ctx := context.WithValue(context.WithValue(context.Background(),
- util.CtxCacheOnly, "1"),
- util.CtxGlobal, "1")
- svc, _ := GetService(ctx, domainProject, serviceID)
- return svc
-}
-
func getServicesRawData(ctx context.Context, domainProject string) ([]*discovery.KeyValue, error) {
key := apt.GenerateServiceKey(domainProject, "")
opts := append(FromContext(ctx),