You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/21 00:36:29 UTC

[servicecomb-service-center] branch v1.x updated: SCB-2176 Refactor websocket (#986) (#989)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new b5e162d  SCB-2176 Refactor websocket (#986) (#989)
b5e162d is described below

commit b5e162da02db641385ce45ee278842ab313d10ea
Author: little-cui <su...@qq.com>
AuthorDate: Fri May 21 08:36:18 2021 +0800

    SCB-2176 Refactor websocket (#986) (#989)
    
    * SCB-2176 Refactor websocket (#986)
    
    * SCB-2176 Refactor event bus
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Refactor websocket (#980)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Fix: websocket health check failed
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add event metrics
    
    (cherry picked from commit cc0f684449f6606c45c86d53508428a50eb32b84)
    
    * SCB-2176 Fix: export metrics APIs
    
    * SCB-2176 Resolve comments
---
 examples/infrastructures/docker/README.md       |   2 +-
 go.mod                                          |   1 +
 go.sum                                          |   3 +
 integration/health-metrics-grafana.json         | 174 ++++++++++++++++--------
 integration/instances_test.go                   | 127 ++++++++++++++++-
 pkg/util/context.go                             |  14 +-
 server/connection/metrics.go                    |  27 +++-
 server/connection/ws/websocket.go               |  24 +---
 server/connection/ws/websocket_test.go          |   2 +-
 server/event/instance_subscriber.go             |  44 +++---
 server/metric/prometheus/metrics.go             |   5 +
 server/metric/prometheus/reporter.go            |   5 +-
 server/plugin/discovery/etcd/cacher_kv.go       |   1 +
 server/plugin/discovery/metrics.go              |  34 ++++-
 server/plugin/discovery/servicecenter/common.go |  30 ++--
 server/service/event/instance_event_handler.go  |  47 ++++---
 server/service/util/microservice_util.go        |   9 --
 17 files changed, 404 insertions(+), 145 deletions(-)

diff --git a/examples/infrastructures/docker/README.md b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
 ## Quick Start
 
 ```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
 docker-compose up
 ```
 This will start up ServiceCenter listening on `:30100` for handling requests and Dashboard listening on `:30103`.
diff --git a/go.mod b/go.mod
index 7d7edd3..7a194dc 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
 	github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // v4
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/elithrar/simple-scrypt v1.3.0
+	github.com/go-chassis/cari v0.3.0
 	github.com/go-chassis/foundation v0.3.0
 	github.com/go-chassis/go-archaius v1.3.2
 	github.com/go-chassis/go-chassis v0.0.0-20200826064053-d90be848aa10
diff --git a/go.sum b/go.sum
index a04e695..4b9d25b 100644
--- a/go.sum
+++ b/go.sum
@@ -75,6 +75,7 @@ github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
@@ -109,6 +110,8 @@ github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2
 github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
 github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ=
 github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
+github.com/go-chassis/cari v0.3.0 h1:ysEX1t9dBObshebFKca3zhrWFqyPvcIZo2r66IyJjuk=
+github.com/go-chassis/cari v0.3.0/go.mod h1:Ie2lW11Y5ZFClY9z7bhAwK6BoNxqGSf3fYGs4mPFs74=
 github.com/go-chassis/foundation v0.1.1-0.20191113114104-2b05871e9ec4/go.mod h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
 github.com/go-chassis/foundation v0.1.1-0.20200825060850-b16bf420f7b3/go.mod h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
 github.com/go-chassis/foundation v0.3.0 h1:jG4BIrK8fXD9jbTtJ5rOLGQZ1pQI/mLnDuVJzToCtos=
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
 {
   "__inputs": [
     {
@@ -2183,7 +2168,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
+          "expr": "sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events",
+      "title": "Produce Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2277,7 +2262,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+          "expr": "max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events Latency",
+      "title": "Produce Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2381,7 +2366,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events",
+      "title": "Post Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2475,7 +2460,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events Latency",
+      "title": "Post Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2526,8 +2511,91 @@
         "x": 0,
         "y": 40
       },
+      "id": 48,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(service_center_notify_pending_total{job=\"service-center\"}) by (instance,source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Pending Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 40
+      },
       "height": "",
-      "id": 37,
+      "id": 43,
       "legend": {
         "alignAsTable": false,
         "avg": false,
@@ -2558,18 +2626,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
+          "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "A"
+          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations",
+      "title": "Subscribers",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2587,7 +2654,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "ops",
+          "format": "none",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2618,25 +2685,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 12,
         "y": 40
       },
       "height": "",
-      "id": 38,
+      "id": 37,
       "legend": {
         "alignAsTable": false,
-        "avg": true,
+        "avg": false,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": true,
-        "min": true,
-        "rightSide": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": true
+        "values": false
       },
       "lines": true,
       "linewidth": 1,
@@ -2653,17 +2720,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
+          "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "B"
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations Latency",
+      "title": "Backend Operations",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2680,7 +2748,8 @@
       },
       "yaxes": [
         {
-          "format": "µs",
+          "decimals": 0,
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2711,25 +2780,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
+        "x": 18,
         "y": 40
       },
       "height": "",
-      "id": 43,
+      "id": 38,
       "legend": {
         "alignAsTable": false,
-        "avg": false,
+        "avg": true,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": false,
-        "min": false,
-        "rightSide": false,
+        "max": true,
+        "min": true,
+        "rightSide": true,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": false
+        "values": true
       },
       "lines": true,
       "linewidth": 1,
@@ -2746,17 +2815,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
+          "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
           "format": "time_series",
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "legendFormat": "{{instance}}> {{operation}}",
           "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribers",
+      "title": "Backend Operations Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2773,8 +2842,7 @@
       },
       "yaxes": [
         {
-          "decimals": 0,
-          "format": "none",
+          "format": "µs",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -3434,7 +3502,7 @@
     "list": []
   },
   "time": {
-    "from": "now-5m",
+    "from": "now-30m",
     "to": "now"
   },
   "timepicker": {
@@ -3465,5 +3533,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 12
+  "version": 17
 }
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index b84fbc4..fcc9108 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,14 +18,19 @@ package integrationtest_test
 
 import (
 	"encoding/json"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
-	"github.com/widuu/gojson"
+	"fmt"
+	"github.com/go-chassis/cari/discovery"
+	"github.com/gorilla/websocket"
+	"github.com/stretchr/testify/assert"
 	"net/http"
 	"strings"
+	"sync"
 
 	"bytes"
 	. "github.com/apache/servicecomb-service-center/integration"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"github.com/widuu/gojson"
 	"io/ioutil"
 	"math/rand"
 	"strconv"
@@ -691,3 +696,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
 		Expect(resp.StatusCode).To(Equal(http.StatusOK))
 	}
 }
+
+func BenchmarkInstanceWatch(t *testing.B) {
+	scclient = insecurityConnection
+	var serviceId, instanceId string
+
+	t.Run("prepare data", func(t *testing.B) {
+		// service
+		serviceName := "testInstance" + strconv.Itoa(rand.Int())
+		servicemap := map[string]interface{}{
+			"serviceName": serviceName,
+			"appId":       "testApp",
+			"version":     "1.0",
+		}
+		bodyParams := map[string]interface{}{
+			"service": servicemap,
+		}
+		body, _ := json.Marshal(bodyParams)
+		bodyBuf := bytes.NewReader(body)
+		req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, bodyBuf)
+		req.Header.Set("X-Domain-Name", "default")
+		resp, err := scclient.Do(req)
+		assert.NoError(t, err)
+		assert.Equal(t, http.StatusOK, resp.StatusCode)
+		respbody, _ := ioutil.ReadAll(resp.Body)
+		serviceId = gojson.Json(string(respbody)).Get("serviceId").Tostring()
+		resp.Body.Close()
+
+		// instance
+		healthcheck := map[string]interface{}{
+			"mode":     "push",
+			"interval": 30000,
+			"times":    20000,
+		}
+		instance := map[string]interface{}{
+			"hostName":    "cse",
+			"healthCheck": healthcheck,
+		}
+		bodyParams = map[string]interface{}{
+			"instance": instance,
+		}
+		body, _ = json.Marshal(bodyParams)
+		bodyBuf = bytes.NewReader(body)
+		req, _ = http.NewRequest(POST, SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+		req.Header.Set("X-Domain-Name", "default")
+		resp, err = scclient.Do(req)
+		assert.NoError(t, err)
+		respbody, _ = ioutil.ReadAll(resp.Body)
+		instanceId = gojson.Json(string(respbody)).Get("instanceId").Tostring()
+		resp.Body.Close()
+
+		req, _ = http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", nil)
+		req.Header.Set("X-Domain-Name", "default")
+		req.Header.Set("X-ConsumerId", serviceId)
+		resp, err = scclient.Do(req)
+		assert.NoError(t, err)
+		resp.Body.Close()
+
+		assert.Equal(t, http.StatusOK, resp.StatusCode)
+	})
+
+	t.Run("test 10K connection", func(t *testing.B) {
+
+		const N, E = 2500, 2500
+		var okWg sync.WaitGroup
+		okWg.Add(N)
+
+		t.Run("new 10K connection", func(t *testing.B) {
+			url := strings.ReplaceAll(strings.ReplaceAll(SCURL, "http://", "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+			for i := 0; i < N; i++ {
+				go func() {
+					conn, _, err := websocket.DefaultDialer.Dial(url, nil)
+					assert.NoError(t, err)
+					for {
+						_ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+						_, data, err := conn.ReadMessage()
+						if err != nil {
+							okWg.Done()
+							return
+						}
+
+						var response discovery.WatchInstanceResponse
+						_ = json.Unmarshal(data, &response)
+						instance := response.Instance
+						timestamp, _ := strconv.ParseInt(instance.ModTimestamp, 10, 64)
+						sub := time.Now().Sub(time.Unix(timestamp, 0))
+						fmt.Println(instance.Properties["tag"], sub)
+					}
+				}()
+			}
+			<-time.After(10 * time.Second)
+		})
+
+		t.Run("fire 10K event", func(t *testing.B) {
+			for i := 0; i < E; i++ {
+				propertiesInstance := map[string]interface{}{
+					"tag": strconv.Itoa(i),
+				}
+				bodyParams := map[string]interface{}{
+					"properties": propertiesInstance,
+				}
+				url := strings.Replace(UPDATEINSTANCEMETADATA, ":serviceId", serviceId, 1)
+				url = strings.Replace(url, ":instanceId", instanceId, 1)
+				body, _ := json.Marshal(bodyParams)
+				bodyBuf := bytes.NewReader(body)
+				req, _ := http.NewRequest(UPDATE, SCURL+url, bodyBuf)
+				req.Header.Set("X-Domain-Name", "default")
+				resp, _ := scclient.Do(req)
+				resp.Body.Close()
+			}
+		})
+
+		t.Run("wait", func(t *testing.B) {
+			okWg.Wait()
+		})
+	})
+}
diff --git a/pkg/util/context.go b/pkg/util/context.go
index d1d4ceb..2c97d3d 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
 import (
 	"context"
 	"net/http"
+	"strings"
 	"time"
 )
 
@@ -28,6 +29,7 @@ const (
 	CtxProject       = "project"
 	CtxTargetDomain  = "target-domain"
 	CtxTargetProject = "target-project"
+	SPLIT            = "/"
 )
 
 type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key string, val interface{}) *http.Reque
 }
 
 func ParseDomainProject(ctx context.Context) string {
-	return ParseDomain(ctx) + "/" + ParseProject(ctx)
+	return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
 }
 
 func ParseTargetDomainProject(ctx context.Context) string {
-	return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+	return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
 }
 
 func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, project string) contex
 	return SetProject(SetDomain(ctx, domain), project)
 }
 
+func SetDomainProjectString(ctx context.Context, domainProject string) context.Context {
+	arr := strings.Split(domainProject, SPLIT)
+	if len(arr) != 2 {
+		return ctx
+	}
+	return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
 func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context {
 	return SetTargetProject(SetTargetDomain(ctx, domain), project)
 }
diff --git a/server/connection/metrics.go b/server/connection/metrics.go
index 76aa41e..a83b229 100644
--- a/server/connection/metrics.go
+++ b/server/connection/metrics.go
@@ -47,6 +47,23 @@ var (
 			Objectives: metric.Pxx,
 		}, []string{"instance", "source", "status"})
 
+	pendingGauge = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: metric.FamilyName,
+			Subsystem: "notify",
+			Name:      "pending_total",
+			Help:      "Counter of pending instance events",
+		}, []string{"instance", "source"})
+
+	pendingLatency = prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Namespace:  metric.FamilyName,
+			Subsystem:  "notify",
+			Name:       "pending_durations_microseconds",
+			Help:       "Latency of pending instance events",
+			Objectives: metric.Pxx,
+		}, []string{"instance", "source"})
+
 	subscriberGauge = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metric.FamilyName,
@@ -57,7 +74,7 @@ var (
 )
 
 func init() {
-	prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge)
+	prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge, pendingLatency, pendingGauge)
 }
 
 func ReportPublishCompleted(evt event.Event, err error) {
@@ -69,6 +86,14 @@ func ReportPublishCompleted(evt event.Event, err error) {
 	}
 	notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
 	notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
+	pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
+}
+
+func ReportPendingCompleted(evt event.Event) {
+	instance := metric.InstanceName()
+	elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
+	pendingLatency.WithLabelValues(instance, evt.Type().String()).Observe(elapsed)
+	pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
 }
 
 func ReportSubscriber(domain, scheme string, n float64) {
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index 24a7e2b..380c9db 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,16 +19,19 @@ package ws
 
 import (
 	"context"
-	"fmt"
+	"errors"
+	"time"
+
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/server/connection"
 	serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
 	"github.com/gorilla/websocket"
-	"time"
 )
 
 const Websocket = "Websocket"
 
+var errServiceNotExist = errors.New("service does not exist")
+
 type WebSocket struct {
 	Options
 	Conn          *websocket.Conn
@@ -146,13 +149,11 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) error {
 	}
 
 	if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
-		return fmt.Errorf("Service does not exist.")
+		return errServiceNotExist
 	}
 
 	remoteAddr := wh.Conn.RemoteAddr().String()
 	if err := wh.WritePingPong(websocket.PingMessage); err != nil {
-		log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, consumer: %s",
-			remoteAddr, wh.ConsumerID)
 		return err
 	}
 
@@ -162,18 +163,7 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) error {
 }
 
 func (wh *WebSocket) WritePingPong(messageType int) error {
-	err := wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
-	if err != nil {
-		messageTypeName := "Ping"
-		if messageType == websocket.PongMessage {
-			messageTypeName = "Pong"
-		}
-		log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: %s",
-			messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
-		//wh.subscriber.SetError(err)
-		return err
-	}
-	return nil
+	return wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
 }
 
 func (wh *WebSocket) WriteTextMessage(message []byte) error {
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index ca40bd6..7518b5a 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -159,6 +159,6 @@ func TestWebSocket_CheckHealth(t *testing.T) {
 	})
 	t.Run("should return err when consumer not exist", func(t *testing.T) {
 		ws := wss.NewWebSocket("", "", mock.ServerConn)
-		assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+		assert.Equal(t, "service does not exist", ws.CheckHealth(context.Background()).Error())
 	})
 }
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index cb1895f..f270fcb 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,12 +18,13 @@
 package event
 
 import (
+	"errors"
 	"github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"time"
+	"github.com/apache/servicecomb-service-center/server/connection"
 )
 
-const AddJobTimeout = 1 * time.Second
+var errBusy = errors.New("too busy")
 
 type InstanceSubscriber struct {
 	event.Subscriber
@@ -47,40 +48,49 @@ func (w *InstanceSubscriber) OnAccept() {
 }
 
 //被通知
-func (w *InstanceSubscriber) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
 	if w.Err() != nil {
 		return
 	}
 
-	wJob, ok := job.(*InstanceEvent)
+	wJob, ok := evt.(*InstanceEvent)
 	if !ok {
 		return
 	}
 	w.sendMessage(wJob)
 }
 
-func (w *InstanceSubscriber) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
 	defer log.Recover()
+
+	connection.ReportPendingCompleted(evt)
+
 	select {
-	case w.Job <- job:
+	case w.Job <- evt:
 	default:
-		timer := time.NewTimer(w.Timeout())
-		select {
-		case w.Job <- job:
-			timer.Stop()
-		case <-timer.C:
-			log.Errorf(nil,
-				"the %s watcher %s %s event queue is full[over %s], drop the event %v",
-				w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
-		}
+		log.Errorf(nil, "the %s watcher %s %s event queue is full, drop the blocked events",
+			w.Type(), w.Group(), w.Subject())
+		w.cleanup()
+		w.Job <- evt
 	}
 }
 
-func (w *InstanceSubscriber) Timeout() time.Duration {
-	return AddJobTimeout
+func (w *InstanceSubscriber) cleanup() {
+	for {
+		select {
+		case evt, ok := <-w.Job:
+			if !ok {
+				return
+			}
+			connection.ReportPublishCompleted(evt, errBusy)
+		default:
+			return
+		}
+	}
 }
 
 func (w *InstanceSubscriber) Close() {
+	w.cleanup()
 	close(w.Job)
 }
 
diff --git a/server/metric/prometheus/metrics.go b/server/metric/prometheus/metrics.go
index 2c2c5c7..e456927 100644
--- a/server/metric/prometheus/metrics.go
+++ b/server/metric/prometheus/metrics.go
@@ -21,8 +21,10 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/rest"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/metric"
+	api "github.com/apache/servicecomb-service-center/server/rest"
 	"github.com/prometheus/client_golang/prometheus"
 	"net/http"
+	"os"
 	"strconv"
 	"strings"
 	"time"
@@ -65,6 +67,9 @@ var (
 
 func init() {
 	prometheus.MustRegister(incomingRequests, successfulRequests, reqDurations, queryPerSeconds)
+	if "true" == os.Getenv("METRICS_ENABLE") {
+		api.RegisterServerHandler("/metrics", prometheus.Handler())
+	}
 }
 
 func ReportRequestCompleted(w http.ResponseWriter, r *http.Request, start time.Time) {
diff --git a/server/metric/prometheus/reporter.go b/server/metric/prometheus/reporter.go
index f089b90..32cc6df 100644
--- a/server/metric/prometheus/reporter.go
+++ b/server/metric/prometheus/reporter.go
@@ -18,7 +18,6 @@ package prometheus
 import (
 	"github.com/apache/servicecomb-service-center/server/metric"
 	dto "github.com/prometheus/client_model/go"
-	"os"
 )
 
 const (
@@ -65,9 +64,7 @@ func (r *APIReporter) toLabels(pairs []*dto.LabelPair) (labels []string) {
 }
 
 func init() {
-	if "true" == os.Getenv("METRICS_ENABLE") {
-		metric.RegisterReporter("rest", NewAPIReporter())
-	}
+	metric.RegisterReporter("rest", NewAPIReporter())
 }
 
 func NewAPIReporter() *APIReporter {
diff --git a/server/plugin/discovery/etcd/cacher_kv.go b/server/plugin/discovery/etcd/cacher_kv.go
index a61d1e3..5a9beea 100644
--- a/server/plugin/discovery/etcd/cacher_kv.go
+++ b/server/plugin/discovery/etcd/cacher_kv.go
@@ -451,6 +451,7 @@ func (c *KvCacher) notify(evts []discovery.KvEvent) {
 	for _, evt := range evts {
 		c.Cfg.OnEvent(evt)
 	}
+	discovery.ReportDispatchEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *discovery.KeyValue) {
diff --git a/server/plugin/discovery/metrics.go b/server/plugin/discovery/metrics.go
index 4cd41a2..ce434ca 100644
--- a/server/plugin/discovery/metrics.go
+++ b/server/plugin/discovery/metrics.go
@@ -39,10 +39,27 @@ var (
 			Help:       "Latency of backend events processing",
 			Objectives: metric.Pxx,
 		}, []string{"instance", "prefix"})
+
+	dispatchCounter = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: metric.FamilyName,
+			Subsystem: "db",
+			Name:      "dispatch_event_total",
+			Help:      "Counter of backend events dispatch",
+		}, []string{"instance", "prefix"})
+
+	dispatchLatency = prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Namespace:  metric.FamilyName,
+			Subsystem:  "db",
+			Name:       "dispatch_event_durations_microseconds",
+			Help:       "Latency of backend events dispatch",
+			Objectives: metric.Pxx,
+		}, []string{"instance", "prefix"})
 )
 
 func init() {
-	prometheus.MustRegister(eventsCounter, eventsLatency)
+	prometheus.MustRegister(eventsCounter, eventsLatency, dispatchCounter, dispatchLatency)
 }
 
 func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
 		eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
 	}
 	eventsCounter.WithLabelValues(instance, prefix).Add(l)
+	dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+	l := float64(len(evts))
+	if l == 0 {
+		return
+	}
+	instance := metric.InstanceName()
+	now := time.Now()
+	for _, evt := range evts {
+		elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+		dispatchLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+	}
+	dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
 }
diff --git a/server/plugin/discovery/servicecenter/common.go b/server/plugin/discovery/servicecenter/common.go
index d59957f..a107db1 100644
--- a/server/plugin/discovery/servicecenter/common.go
+++ b/server/plugin/discovery/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package servicecenter
 
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 0d6010d..46f9b3a 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -18,6 +18,9 @@ package event
 
 import (
 	"context"
+	"fmt"
+	"strings"
+
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
@@ -29,7 +32,6 @@ import (
 	"github.com/apache/servicecomb-service-center/server/service/cache"
 	"github.com/apache/servicecomb-service-center/server/service/metrics"
 	serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
-	"strings"
 )
 
 const (
@@ -57,12 +59,15 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
 	domainName := domainProject[:idx]
 	projectName := domainProject[idx+1:]
 
+	ctx := context.WithValue(context.WithValue(context.Background(),
+		util.CtxCacheOnly, "1"),
+		util.CtxGlobal, "1")
+
 	var count float64 = increaseOne
-	switch action {
-	case pb.EVT_INIT:
+	if action == pb.EVT_INIT {
 		metrics.ReportInstances(domainName, count)
-		ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
-		if ms == nil {
+		ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+		if err != nil {
 			log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
 				action, providerID, providerInstanceID, instance.Endpoints)
 			return
@@ -70,11 +75,10 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
 		frameworkName, frameworkVersion := getFramework(ms)
 		metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
 		return
-	case pb.EVT_CREATE:
-		metrics.ReportInstances(domainName, count)
-	case pb.EVT_DELETE:
+	}
+
+	if action == pb.EVT_DELETE {
 		count = decreaseOne
-		metrics.ReportInstances(domainName, count)
 		if !apt.IsDefaultDomainProject(domainProject) {
 			projectName := domainProject[idx+1:]
 			serviceUtil.RemandInstanceQuota(
@@ -82,25 +86,25 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
 		}
 	}
 
-	if event.Center().Closed() {
-		log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
-			action, providerID, providerInstanceID, instance.Endpoints)
+	// 查询服务版本信息
+	ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+	if err != nil {
+		log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
+			action, providerID, providerInstanceID, instance.Endpoints), err)
 		return
 	}
 
-	// 查询服务版本信息
-	ctx := context.WithValue(context.WithValue(context.Background(),
-		util.CtxCacheOnly, "1"),
-		util.CtxGlobal, "1")
-	ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
-	if ms == nil {
-		log.Errorf(err, "caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
+	if event.Center().Closed() {
+		log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
 			action, providerID, providerInstanceID, instance.Endpoints)
 		return
 	}
 
-	frameworkName, frameworkVersion := getFramework(ms)
-	metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+	if action != pb.EVT_UPDATE {
+		frameworkName, frameworkVersion := getFramework(ms)
+		metrics.ReportInstances(domainName, count)
+		metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+	}
 
 	log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, endpoints %v",
 		action, providerID, ms.Environment, ms.AppId, ms.ServiceName, ms.Version,
@@ -135,7 +139,6 @@ func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKe
 		Instance: evt.KV.Value.(*pb.MicroServiceInstance),
 	}
 	for _, consumerID := range subscribers {
-		// TODO add超时怎么处理?
 		evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
 		err := event.Center().Fire(evt)
 		if err != nil {
diff --git a/server/service/util/microservice_util.go b/server/service/util/microservice_util.go
index c10f33f..987d60d 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -64,15 +64,6 @@ func GetService(ctx context.Context, domainProject string, serviceID string) (*p
 	return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
 }
 
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) *pb.MicroService {
-	ctx := context.WithValue(context.WithValue(context.Background(),
-		util.CtxCacheOnly, "1"),
-		util.CtxGlobal, "1")
-	svc, _ := GetService(ctx, domainProject, serviceID)
-	return svc
-}
-
 func getServicesRawData(ctx context.Context, domainProject string) ([]*discovery.KeyValue, error) {
 	key := apt.GenerateServiceKey(domainProject, "")
 	opts := append(FromContext(ctx),