You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/20 01:45:30 UTC

[servicecomb-service-center] branch master updated: SCB-2176 Refactor websocket (#986)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new cc0f684  SCB-2176 Refactor websocket (#986)
cc0f684 is described below

commit cc0f684449f6606c45c86d53508428a50eb32b84
Author: little-cui <su...@qq.com>
AuthorDate: Thu May 20 09:45:23 2021 +0800

    SCB-2176 Refactor websocket (#986)
    
    * SCB-2176 Refactor event bus
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Refactor websocket (#980)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Fix: websocket health check failed
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add event metrics
---
 datasource/etcd/event/instance_event_handler.go    |  29 +-
 datasource/etcd/sd/etcd/cacher_kv.go               |   1 +
 datasource/etcd/sd/metrics.go                      |  32 +++
 datasource/etcd/sd/servicecenter/common.go         |  30 +-
 datasource/etcd/util/microservice_util.go          |   7 -
 docs/user-guides/metrics.md                        |   4 +
 examples/infrastructures/docker/README.md          |   2 +-
 integration/apis.go                                |   1 -
 integration/health-metrics-grafana.json            | 174 ++++++++----
 integration/instances_test.go                      | 133 ++++++++-
 pkg/event/bus.go                                   |   6 +-
 pkg/event/subscriber.go                            |   5 +-
 pkg/proto/service_ex.go                            |   1 -
 pkg/util/context.go                                |  14 +-
 server/connection/grpc/stream.go                   |   7 +-
 server/connection/grpc/stream_test.go              |   4 +-
 server/connection/ws/broker.go                     |  81 ++++++
 .../connection/ws/broker_test.go                   |  28 +-
 server/connection/ws/common.go                     |  68 +++++
 .../connection/ws/common_test.go                   |  33 ++-
 .../ws/{publisher.go => health_check.go}           |  87 +++---
 .../ws/health_check_test.go}                       |  28 +-
 .../ws/options.go}                                 |  23 +-
 server/connection/ws/websocket.go                  | 307 +++++++--------------
 server/connection/ws/websocket_test.go             | 174 +++++++-----
 server/event/instance_event.go                     |  51 ++++
 server/event/instance_subscriber.go                | 131 +++------
 server/metrics/connection.go                       |  26 +-
 server/rest/controller/v3/instance_watcher.go      |   1 -
 server/rest/controller/v4/instance_watcher.go      |  14 -
 server/service/watch.go                            |  15 +-
 server/service/watch_test.go                       |   7 -
 32 files changed, 904 insertions(+), 620 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index 7297b7c..b247c2a 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -22,8 +22,6 @@ import (
 	"fmt"
 	"strings"
 
-	pb "github.com/go-chassis/cari/discovery"
-
 	"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/kv"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
@@ -36,6 +34,7 @@ import (
 	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
 	"github.com/apache/servicecomb-service-center/server/syncernotify"
+	pb "github.com/go-chassis/cari/discovery"
 )
 
 const (
@@ -65,12 +64,13 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 	domainName := domainProject[:idx]
 	projectName := domainProject[idx+1:]
 
+	ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
+
 	var count float64 = increaseOne
-	switch action {
-	case pb.EVT_INIT:
+	if action == pb.EVT_INIT {
 		metrics.ReportInstances(domainName, count)
-		ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
-		if ms == nil {
+		ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+		if err != nil {
 			log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
 				action, providerID, providerInstanceID, instance.Endpoints)
 			return
@@ -78,11 +78,10 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 		frameworkName, frameworkVersion := getFramework(ms)
 		metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
 		return
-	case pb.EVT_CREATE:
-		metrics.ReportInstances(domainName, count)
-	case pb.EVT_DELETE:
+	}
+
+	if action == pb.EVT_DELETE {
 		count = decreaseOne
-		metrics.ReportInstances(domainName, count)
 		if !core.IsDefaultDomainProject(domainProject) {
 			projectName := domainProject[idx+1:]
 			serviceUtil.RemandInstanceQuota(
@@ -91,9 +90,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 	}
 
 	// 查询服务版本信息
-	ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
 	ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
-
 	if err != nil {
 		log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
 			action, providerID, providerInstanceID, instance.Endpoints), err)
@@ -110,8 +107,11 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 		return
 	}
 
-	frameworkName, frameworkVersion := getFramework(ms)
-	metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+	if action != pb.EVT_UPDATE {
+		frameworkName, frameworkVersion := getFramework(ms)
+		metrics.ReportInstances(domainName, count)
+		metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+	}
 
 	log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, endpoints %v",
 		action, providerID, ms.Environment, ms.AppId, ms.ServiceName, ms.Version,
@@ -146,7 +146,6 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
 		Instance: evt.KV.Value.(*pb.MicroServiceInstance),
 	}
 	for _, consumerID := range subscribers {
-		// TODO add超时怎么处理?
 		evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
 		err := event.Center().Fire(evt)
 		if err != nil {
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go b/datasource/etcd/sd/etcd/cacher_kv.go
index 11eb70a..4606fab 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -454,6 +454,7 @@ func (c *KvCacher) notify(evts []sd.KvEvent) {
 	for _, evt := range evts {
 		c.Cfg.OnEvent(evt)
 	}
+	sd.ReportDispatchEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *sd.KeyValue) {
diff --git a/datasource/etcd/sd/metrics.go b/datasource/etcd/sd/metrics.go
index 82e38c0..f25e30b 100644
--- a/datasource/etcd/sd/metrics.go
+++ b/datasource/etcd/sd/metrics.go
@@ -43,6 +43,23 @@ var (
 			Help:       "Latency of backend events processing",
 			Objectives: metrics.Pxx,
 		}, []string{"instance", "prefix"})
+
+	dispatchCounter = helper.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: metrics.FamilyName,
+			Subsystem: "db",
+			Name:      "dispatch_event_total",
+			Help:      "Counter of backend events dispatch",
+		}, []string{"instance", "prefix"})
+
+	dispatchLatency = helper.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Namespace:  metrics.FamilyName,
+			Subsystem:  "db",
+			Name:       "dispatch_event_durations_microseconds",
+			Help:       "Latency of backend events dispatch",
+			Objectives: metrics.Pxx,
+		}, []string{"instance", "prefix"})
 )
 
 func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
 		eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
 	}
 	eventsCounter.WithLabelValues(instance, prefix).Add(l)
+	dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+	l := float64(len(evts))
+	if l == 0 {
+		return
+	}
+	instance := metrics.InstanceName()
+	now := time.Now()
+	for _, evt := range evts {
+		elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+		dispatchLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+	}
+	dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
 }
diff --git a/datasource/etcd/sd/servicecenter/common.go b/datasource/etcd/sd/servicecenter/common.go
index d59957f..a107db1 100644
--- a/datasource/etcd/sd/servicecenter/common.go
+++ b/datasource/etcd/sd/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package servicecenter
 
diff --git a/datasource/etcd/util/microservice_util.go b/datasource/etcd/util/microservice_util.go
index d7c30ef..6c45a06 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -64,13 +64,6 @@ func GetService(ctx context.Context, domainProject string, serviceID string) (*p
 	return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
 }
 
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) *pb.MicroService {
-	ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
-	svc, _ := GetService(ctx, domainProject, serviceID)
-	return svc
-}
-
 func getServicesRawData(ctx context.Context, domainProject string) ([]*sd.KeyValue, error) {
 	key := path.GenerateServiceKey(domainProject, "")
 	opts := append(FromContext(ctx),
diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md
index e3ead16..f815756 100644
--- a/docs/user-guides/metrics.md
+++ b/docs/user-guides/metrics.md
@@ -11,6 +11,8 @@
 ### Pub/Sub
 1. notify_publish_total
 1. notify_publish_durations_microseconds
+1. notify_pending_total
+1. notify_pending_durations_microseconds
 1. notify_subscriber_total
 
 ### Meta
@@ -25,6 +27,8 @@
 ### Backend
 1. db_backend_event_total
 1. db_backend_event_durations_microseconds
+1. db_dispatch_event_total
+1. db_dispatch_event_durations_microseconds
 1. db_backend_operation_total
 1. db_backend_operation_durations_microseconds
 1. db_backend_total
diff --git a/examples/infrastructures/docker/README.md b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
 ## Quick Start
 
 ```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
 docker-compose up
 ```
 This will start up ServiceCenter listening on `:30100` for handling requests and Dashboard listening on `:30103`.
diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = "/v4/default/registry/microservices/:serviceId/inst
 var UPDATEINSTANCESTATUS = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
 var INSTANCEHEARTBEAT = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
 var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = "/v4/default/registry/microservices/:serviceId/listwatcher"
 
 //Governance API's
 var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
 {
   "__inputs": [
     {
@@ -2183,7 +2168,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
+          "expr": "sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events",
+      "title": "Produce Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2277,7 +2262,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+          "expr": "max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events Latency",
+      "title": "Produce Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2381,7 +2366,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events",
+      "title": "Post Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2475,7 +2460,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events Latency",
+      "title": "Post Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2526,8 +2511,91 @@
         "x": 0,
         "y": 40
       },
+      "id": 48,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(service_center_notify_pending_total{job=\"service-center\"}) by (instance,source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Pending Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 40
+      },
       "height": "",
-      "id": 37,
+      "id": 43,
       "legend": {
         "alignAsTable": false,
         "avg": false,
@@ -2558,18 +2626,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
+          "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "A"
+          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations",
+      "title": "Subscribers",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2587,7 +2654,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "ops",
+          "format": "none",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2618,25 +2685,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 12,
         "y": 40
       },
       "height": "",
-      "id": 38,
+      "id": 37,
       "legend": {
         "alignAsTable": false,
-        "avg": true,
+        "avg": false,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": true,
-        "min": true,
-        "rightSide": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": true
+        "values": false
       },
       "lines": true,
       "linewidth": 1,
@@ -2653,17 +2720,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
+          "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "B"
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations Latency",
+      "title": "Backend Operations",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2680,7 +2748,8 @@
       },
       "yaxes": [
         {
-          "format": "µs",
+          "decimals": 0,
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2711,25 +2780,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
+        "x": 18,
         "y": 40
       },
       "height": "",
-      "id": 43,
+      "id": 38,
       "legend": {
         "alignAsTable": false,
-        "avg": false,
+        "avg": true,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": false,
-        "min": false,
-        "rightSide": false,
+        "max": true,
+        "min": true,
+        "rightSide": true,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": false
+        "values": true
       },
       "lines": true,
       "linewidth": 1,
@@ -2746,17 +2815,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
+          "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
           "format": "time_series",
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "legendFormat": "{{instance}}> {{operation}}",
           "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribers",
+      "title": "Backend Operations Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2773,8 +2842,7 @@
       },
       "yaxes": [
         {
-          "decimals": 0,
-          "format": "none",
+          "format": "µs",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -3434,7 +3502,7 @@
     "list": []
   },
   "time": {
-    "from": "now-5m",
+    "from": "now-30m",
     "to": "now"
   },
   "timepicker": {
@@ -3465,5 +3533,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 12
+  "version": 17
 }
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index d4fd6b0..1f24fc7 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,8 +18,13 @@ package integrationtest_test
 
 import (
 	"encoding/json"
+	"fmt"
+	"github.com/go-chassis/cari/discovery"
+	"github.com/gorilla/websocket"
+	"github.com/stretchr/testify/assert"
 	"net/http"
 	"strings"
+	"sync"
 
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
@@ -601,18 +606,6 @@ var _ = Describe("MicroService Api Test", func() {
 
 				Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
 			})
-			It("Call the listwatcher API ", func() {
-				//This api gives 400 bad request for the integration test
-				// as integration test is not able to make ws connection
-				url := strings.Replace(INSTANCELISTWATCHER, ":serviceId", serviceId, 1)
-				req, _ := http.NewRequest(GET, SCURL+url, nil)
-				req.Header.Set("X-Domain-Name", "default")
-				resp, err := scclient.Do(req)
-				Expect(err).To(BeNil())
-				defer resp.Body.Close()
-
-				Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
-			})
 		})
 	})
 
@@ -705,3 +698,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
 		Expect(resp.StatusCode).To(Equal(http.StatusOK))
 	}
 }
+
+func BenchmarkInstanceWatch(t *testing.B) {
+	scclient = insecurityConnection
+	var serviceId, instanceId string
+
+	t.Run("prepare data", func(t *testing.B) {
+		// service
+		serviceName := "testInstance" + strconv.Itoa(rand.Int())
+		servicemap := map[string]interface{}{
+			"serviceName": serviceName,
+			"appId":       "testApp",
+			"version":     "1.0",
+		}
+		bodyParams := map[string]interface{}{
+			"service": servicemap,
+		}
+		body, _ := json.Marshal(bodyParams)
+		bodyBuf := bytes.NewReader(body)
+		req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, bodyBuf)
+		req.Header.Set("X-Domain-Name", "default")
+		resp, err := scclient.Do(req)
+		assert.NoError(t, err)
+		assert.Equal(t, http.StatusOK, resp.StatusCode)
+		respbody, _ := ioutil.ReadAll(resp.Body)
+		serviceId = gojson.Json(string(respbody)).Get("serviceId").Tostring()
+		resp.Body.Close()
+
+		// instance
+		healthcheck := map[string]interface{}{
+			"mode":     "push",
+			"interval": 30000,
+			"times":    20000,
+		}
+		instance := map[string]interface{}{
+			"hostName":    "cse",
+			"healthCheck": healthcheck,
+		}
+		bodyParams = map[string]interface{}{
+			"instance": instance,
+		}
+		body, _ = json.Marshal(bodyParams)
+		bodyBuf = bytes.NewReader(body)
+		req, _ = http.NewRequest(POST, SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+		req.Header.Set("X-Domain-Name", "default")
+		resp, err = scclient.Do(req)
+		assert.NoError(t, err)
+		respbody, _ = ioutil.ReadAll(resp.Body)
+		instanceId = gojson.Json(string(respbody)).Get("instanceId").Tostring()
+		resp.Body.Close()
+
+		req, _ = http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", nil)
+		req.Header.Set("X-Domain-Name", "default")
+		req.Header.Set("X-ConsumerId", serviceId)
+		resp, err = scclient.Do(req)
+		assert.NoError(t, err)
+		resp.Body.Close()
+
+		assert.Equal(t, http.StatusOK, resp.StatusCode)
+	})
+
+	t.Run("test 10K connection", func(t *testing.B) {
+
+		const N, E = 2500, 2500
+		var okWg sync.WaitGroup
+		okWg.Add(N)
+
+		t.Run("new 10K connection", func(t *testing.B) {
+			url := strings.ReplaceAll(strings.ReplaceAll(SCURL, "http://", "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+			for i := 0; i < N; i++ {
+				go func() {
+					conn, _, err := websocket.DefaultDialer.Dial(url, nil)
+					assert.NoError(t, err)
+					for {
+						_ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+						_, data, err := conn.ReadMessage()
+						if err != nil {
+							okWg.Done()
+							return
+						}
+
+						var response discovery.WatchInstanceResponse
+						_ = json.Unmarshal(data, &response)
+						instance := response.Instance
+						timestamp, _ := strconv.ParseInt(instance.ModTimestamp, 10, 64)
+						sub := time.Now().Sub(time.Unix(timestamp, 0))
+						fmt.Println(instance.Properties["tag"], sub)
+					}
+				}()
+			}
+			<-time.After(10 * time.Second)
+		})
+
+		t.Run("fire 10K event", func(t *testing.B) {
+			for i := 0; i < E; i++ {
+				propertiesInstance := map[string]interface{}{
+					"tag": strconv.Itoa(i),
+				}
+				bodyParams := map[string]interface{}{
+					"properties": propertiesInstance,
+				}
+				url := strings.Replace(UPDATEINSTANCEMETADATA, ":serviceId", serviceId, 1)
+				url = strings.Replace(url, ":instanceId", instanceId, 1)
+				body, _ := json.Marshal(bodyParams)
+				bodyBuf := bytes.NewReader(body)
+				req, _ := http.NewRequest(UPDATE, SCURL+url, bodyBuf)
+				req.Header.Set("X-Domain-Name", "default")
+				resp, _ := scclient.Do(req)
+				resp.Body.Close()
+			}
+		})
+
+		t.Run("wait", func(t *testing.B) {
+			okWg.Wait()
+		})
+	})
+}
diff --git a/pkg/event/bus.go b/pkg/event/bus.go
index a573381..4e7fe77 100644
--- a/pkg/event/bus.go
+++ b/pkg/event/bus.go
@@ -41,14 +41,14 @@ func (bus *Bus) Fire(evt Event) {
 	bus.Add(queue.Task{Payload: evt})
 }
 
-func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
-	bus.fireAtOnce(evt.(Event))
+func (bus *Bus) Handle(ctx context.Context, payload interface{}) {
+	bus.fireAtOnce(payload.(Event))
 }
 
 func (bus *Bus) fireAtOnce(evt Event) {
 	if itf, ok := bus.subjects.Get(evt.Subject()); ok {
 		itf.(*Poster).Post(evt)
-	}
+	} // else the evt will be discard
 }
 
 func (bus *Bus) Subjects(name string) *Poster {
diff --git a/pkg/event/subscriber.go b/pkg/event/subscriber.go
index b26060b..df771dd 100644
--- a/pkg/event/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -31,12 +31,15 @@ type Subscriber interface {
 	Bus() *BusService
 	SetBus(*BusService)
 
+	// Err event bus remove subscriber automatically, if return not nil.
+	// Implement of OnMessage should call SetError when run exception
 	Err() error
 	SetError(err error)
 
 	Close()
+	// OnAccept call when subscriber appended in event bus successfully
 	OnAccept()
-	// The event bus will callback this function, so it must be non-blocked.
+	// OnMessage call when event bus fire a msg, it must be non-blocked
 	OnMessage(Event)
 }
 
diff --git a/pkg/proto/service_ex.go b/pkg/proto/service_ex.go
index ce955d3..64c0b9e 100644
--- a/pkg/proto/service_ex.go
+++ b/pkg/proto/service_ex.go
@@ -30,7 +30,6 @@ type ServiceInstanceCtrlServerEx interface {
 	BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
 
 	WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
-	WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
 
 	ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
 }
diff --git a/pkg/util/context.go b/pkg/util/context.go
index b77bbc2..2f5b621 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
 import (
 	"context"
 	"net/http"
+	"strings"
 	"time"
 )
 
@@ -28,6 +29,7 @@ const (
 	CtxProject       CtxKey = "project"
 	CtxTargetDomain  CtxKey = "target-domain"
 	CtxTargetProject CtxKey = "target-project"
+	SPLIT                   = "/"
 )
 
 type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key CtxKey, val interface{}) *http.Reque
 }
 
 func ParseDomainProject(ctx context.Context) string {
-	return ParseDomain(ctx) + "/" + ParseProject(ctx)
+	return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
 }
 
 func ParseTargetDomainProject(ctx context.Context) string {
-	return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+	return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
 }
 
 func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, project string) contex
 	return SetProject(SetDomain(ctx, domain), project)
 }
 
+func SetDomainProjectString(ctx context.Context, domainProject string) context.Context {
+	arr := strings.Split(domainProject, SPLIT)
+	if len(arr) != 2 {
+		return ctx
+	}
+	return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
 func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context {
 	return SetTargetProject(SetTargetDomain(ctx, domain), project)
 }
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index b74acc3..b786c8f 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -28,12 +28,11 @@ import (
 	"github.com/apache/servicecomb-service-center/server/connection"
 	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	pb "github.com/go-chassis/cari/discovery"
 )
 
 const GRPC = "gRPC"
 
-func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
 	timer := time.NewTimer(connection.HeartbeatInterval)
 	defer timer.Stop()
 	for {
@@ -69,10 +68,10 @@ func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanc
 	}
 }
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, f)
+	watcher := event.NewInstanceSubscriber(serviceID, domainProject)
 	err = event.Center().AddSubscriber(watcher)
 	if err != nil {
 		return
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 3381167..3bb7d78 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -42,7 +42,7 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-	w := event.NewInstanceEventListWatcher("g", "s", nil)
+	w := event.NewInstanceSubscriber("g", "s")
 	w.Job <- nil
 	err := stream.Handle(w, &grpcWatchServer{})
 	if err == nil {
@@ -55,6 +55,6 @@ func TestHandleWatchJob(t *testing.T) {
 
 func TestDoStreamListAndWatch(t *testing.T) {
 	defer log.Recover()
-	err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+	err := stream.Watch(context.Background(), "s", nil)
 	t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..1501945
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/apache/servicecomb-service-center/server/metrics"
+	pb "github.com/go-chassis/cari/discovery"
+)
+
+var errChanClosed = fmt.Errorf("chan closed")
+
+type Broker struct {
+	consumer *WebSocket
+	producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case instanceEvent, ok := <-b.producer.Job:
+			if !ok {
+				return errChanClosed
+			}
+			err := b.write(instanceEvent)
+			if err != nil {
+				return err
+			}
+		}
+	}
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+	resp := evt.Response
+	providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
+	if resp.Action != string(pb.EVT_EXPIRE) {
+		providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+	}
+	remoteAddr := b.consumer.Conn.RemoteAddr().String()
+	log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+		resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+	resp.Response = nil
+	data, err := json.Marshal(resp)
+	if err != nil {
+		log.Errorf(err, "subscriber[%s] watch %s, group: %s", remoteAddr, providerFlag, b.producer.Group())
+		data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
+	}
+	err = b.consumer.WriteTextMessage(data)
+	metrics.ReportPublishCompleted(evt, err)
+	return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+	return &Broker{
+		consumer: ws,
+		producer: is,
+	}
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/broker_test.go
similarity index 55%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/broker_test.go
index ce955d3..a3597c3 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package proto
+package ws_test
 
 import (
 	"context"
-
-	"github.com/go-chassis/cari/discovery"
-	"github.com/gorilla/websocket"
+	"github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/stretchr/testify/assert"
+	"testing"
 )
 
-type ServiceInstanceCtrlServerEx interface {
-	ServiceInstanceCtrlServer
-
-	BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
+func TestNewBroker(t *testing.T) {
+	t.Run("should not return nil when new broker", func(t *testing.T) {
+		assert.NotNil(t, ws.NewBroker(nil, nil))
 
-	WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
-	WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
+	})
+}
 
-	ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
+func TestBroker_Listen(t *testing.T) {
+	t.Run("should return err when listen context cancelled", func(t *testing.T) {
+		broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+		ctx, cancel := context.WithCancel(context.Background())
+		cancel()
+		assert.Equal(t, context.Canceled, broker.Listen(ctx))
+	})
 }
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
new file mode 100644
index 0000000..d9e709c
--- /dev/null
+++ b/server/connection/ws/common.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/servicecomb-service-center/pkg/gopool"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/apache/servicecomb-service-center/server/metrics"
+	"github.com/gorilla/websocket"
+)
+
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
+	domainProject := util.ParseDomainProject(ctx)
+	domain := util.ParseDomain(ctx)
+
+	ws := NewWebSocket(domainProject, serviceID, conn)
+	HealthChecker().Accept(ws)
+
+	subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+	err := event.Center().AddSubscriber(subscriber)
+	if err != nil {
+		SendEstablishError(conn, err)
+		return
+	}
+
+	metrics.ReportSubscriber(domain, Websocket, 1)
+	defer metrics.ReportSubscriber(domain, Websocket, -1)
+
+	pool := gopool.New(ctx).Do(func(ctx context.Context) {
+		if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+			log.Error(fmt.Sprintf("[%s] listen service[%s] failed", conn.RemoteAddr(), serviceID), err)
+		}
+	})
+	defer pool.Done()
+
+	if err := ws.ReadMessage(); err != nil {
+		log.Error(fmt.Sprintf("read subscriber[%s][%s] message failed", serviceID, conn.RemoteAddr()), err)
+		subscriber.SetError(err)
+	}
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+	remoteAddr := conn.RemoteAddr().String()
+	log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
+	if err := conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error())); err != nil {
+		log.Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
+	}
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/common_test.go
similarity index 53%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/common_test.go
index ce955d3..df8ae55 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/common_test.go
@@ -15,22 +15,33 @@
  * limitations under the License.
  */
 
-package proto
+package ws_test
 
 import (
 	"context"
+	"errors"
+	"testing"
 
-	"github.com/go-chassis/cari/discovery"
-	"github.com/gorilla/websocket"
+	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/stretchr/testify/assert"
 )
 
-type ServiceInstanceCtrlServerEx interface {
-	ServiceInstanceCtrlServer
-
-	BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
-
-	WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
-	WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+	mock := NewTest()
+	t.Run("should read the err when call", func(t *testing.T) {
+		wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+		_, message, err := mock.ClientConn.ReadMessage()
+		assert.Nil(t, err)
+		assert.Equal(t, "error", string(message))
+	})
+}
 
-	ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
+func TestWatch(t *testing.T) {
+	t.Run("should return when ctx cancelled", func(t *testing.T) {
+		mock := NewTest()
+		mock.ServerConn.Close()
+		ctx, cancel := context.WithCancel(context.Background())
+		cancel()
+		wss.Watch(ctx, "", mock.ServerConn)
+	})
 }
diff --git a/server/connection/ws/publisher.go b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/publisher.go
rename to server/connection/ws/health_check.go
index 267ff24..aeb4f38 100644
--- a/server/connection/ws/publisher.go
+++ b/server/connection/ws/health_check.go
@@ -19,40 +19,36 @@ package ws
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/pkg/gopool"
+	"github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-var publisher *Publisher
+var checker *HealthCheck
 
 func init() {
-	publisher = NewPublisher()
-	publisher.Run()
+	checker = NewHealthCheck()
+	checker.Run()
 }
 
-type Publisher struct {
+type HealthCheck struct {
 	wss       []*WebSocket
 	lock      sync.Mutex
 	goroutine *gopool.Pool
 }
 
-func (wh *Publisher) Run() {
-	gopool.Go(publisher.loop)
+func (wh *HealthCheck) Run() {
+	gopool.Go(checker.loop)
 }
 
-func (wh *Publisher) Stop() {
+func (wh *HealthCheck) Stop() {
 	wh.goroutine.Close(true)
 }
 
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
-	wh.goroutine.Do(func(ctx context.Context) {
-		ws.HandleEvent(payload)
-	})
-}
-
-func (wh *Publisher) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
 	defer wh.Stop()
 	ticker := time.NewTicker(500 * time.Millisecond)
 	for {
@@ -61,49 +57,52 @@ func (wh *Publisher) loop(ctx context.Context) {
 			// server shutdown
 			return
 		case <-ticker.C:
-			var removes []int
-			for i, ws := range wh.wss {
-				if payload := ws.Pick(); payload != nil {
-					if _, ok := payload.(error); ok {
-						removes = append(removes, i)
-					}
-					wh.dispatch(ws, payload)
+			for _, ws := range wh.wss {
+				if t := ws.NeedCheck(); t == nil {
+					continue
 				}
+				wh.check(ws)
 			}
-			if len(removes) == 0 {
-				continue
-			}
-
-			wh.lock.Lock()
-			var (
-				news []*WebSocket
-				s    int
-			)
-			for _, e := range removes {
-				news = append(news, wh.wss[s:e]...)
-				s = e + 1
-			}
-			if s < len(wh.wss) {
-				news = append(news, wh.wss[s:]...)
-			}
-			wh.wss = news
-			wh.lock.Unlock()
 		}
 	}
 }
 
-func (wh *Publisher) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+	wh.goroutine.Do(func(ctx context.Context) {
+		if err := ws.CheckHealth(ctx); err != nil {
+			wh.Remove(ws)
+			log.Error(fmt.Sprintf("checker removed unhealth websocket[%s]", ws.RemoteAddr), err)
+		}
+	})
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
 	wh.lock.Lock()
 	wh.wss = append(wh.wss, ws)
+	n := len(wh.wss)
+	wh.lock.Unlock()
+	return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+	wh.lock.Lock()
+	for i, t := range wh.wss {
+		if t == ws {
+			wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+			break
+		}
+	}
+	n := len(wh.wss)
 	wh.lock.Unlock()
+	return n
 }
 
-func NewPublisher() *Publisher {
-	return &Publisher{
+func NewHealthCheck() *HealthCheck {
+	return &HealthCheck{
 		goroutine: gopool.New(context.Background()),
 	}
 }
 
-func Instance() *Publisher {
-	return publisher
+func HealthChecker() *HealthCheck {
+	return checker
 }
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/connection/ws/health_check_test.go
similarity index 58%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/health_check_test.go
index 7395c4c..7ab0578 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/health_check_test.go
@@ -14,20 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package v3
+
+package ws_test
 
 import (
-	"github.com/apache/servicecomb-service-center/pkg/rest"
-	"github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+	"github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/stretchr/testify/assert"
+	"testing"
 )
 
-type WatchService struct {
-	v4.WatchService
+func TestNewHealthCheck(t *testing.T) {
+	t.Run("should not return nil when new", func(t *testing.T) {
+		assert.NotNil(t, ws.NewHealthCheck())
+	})
 }
 
-func (this *WatchService) URLPatterns() []rest.Route {
-	return []rest.Route{
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
-	}
+func TestHealthCheck_Run(t *testing.T) {
+	mock := NewTest()
+
+	t.Run("should return 1 when accept one ws", func(t *testing.T) {
+		check := ws.NewHealthCheck()
+		webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+		assert.Equal(t, 1, check.Accept(webSocket))
+		assert.Equal(t, 0, check.Remove(webSocket))
+	})
 }
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/connection/ws/options.go
similarity index 64%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/options.go
index 7395c4c..3196623 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/options.go
@@ -14,20 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package v3
+
+package ws
 
 import (
-	"github.com/apache/servicecomb-service-center/pkg/rest"
-	"github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+	"time"
+
+	"github.com/apache/servicecomb-service-center/server/connection"
 )
 
-type WatchService struct {
-	v4.WatchService
+type Options struct {
+	ReadTimeout    time.Duration
+	SendTimeout    time.Duration
+	HealthInterval time.Duration
 }
 
-func (this *WatchService) URLPatterns() []rest.Route {
-	return []rest.Route{
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
+func ToOptions() Options {
+	return Options{
+		ReadTimeout:    connection.ReadTimeout,
+		SendTimeout:    connection.SendTimeout,
+		HealthInterval: connection.HeartbeatInterval,
 	}
 }
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index 57b064c..e9fe3c1 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,311 +19,190 @@ package ws
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/util"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
-	"github.com/apache/servicecomb-service-center/server/event"
-	"github.com/apache/servicecomb-service-center/server/metrics"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 )
 
 const Websocket = "Websocket"
 
-type WebSocket struct {
-	ctx    context.Context
-	ticker *time.Ticker
-	conn   *websocket.Conn
-	// watcher subscribe the notification service event
-	watcher         *event.InstanceEventListWatcher
-	needPingWatcher bool
-	free            chan struct{}
-	closed          chan struct{}
-}
-
-func (wh *WebSocket) Init() error {
-	wh.ticker = time.NewTicker(connection.HeartbeatInterval)
-	wh.needPingWatcher = true
-	wh.free = make(chan struct{}, 1)
-	wh.closed = make(chan struct{})
+var errServiceNotExist = fmt.Errorf("Service does not exist.")
 
-	wh.SetReady()
-
-	remoteAddr := wh.conn.RemoteAddr().String()
-
-	// put in notification service queue
-	if err := event.Center().AddSubscriber(wh.watcher); err != nil {
-		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
-			remoteAddr, err.Error())
-		log.Errorf(nil, err.Error())
-
-		werr := wh.conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error()))
-		if werr != nil {
-			log.Errorf(werr, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
-		}
-		return err
-	}
-
-	// put in publisher queue
-	Instance().Accept(wh)
+type WebSocket struct {
+	Options
+	Conn          *websocket.Conn
+	RemoteAddr    string
+	DomainProject string
+	ConsumerID    string
 
-	log.Debugf("start watching instance status, watcher[%s], subject: %s, group: %s",
-		remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-	return nil
+	ticker   *time.Ticker
+	needPing bool
+	idleCh   chan struct{}
 }
 
-func (wh *WebSocket) ReadTimeout() time.Duration {
-	return connection.ReadTimeout
-}
+func (wh *WebSocket) Init() {
+	wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+	wh.ticker = time.NewTicker(wh.HealthInterval)
+	wh.needPing = true
+	wh.idleCh = make(chan struct{}, 1)
 
-func (wh *WebSocket) SendTimeout() time.Duration {
-	return connection.SendTimeout
-}
+	wh.registerMessageHandler()
 
-func (wh *WebSocket) Heartbeat(messageType int) error {
-	err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout()))
-	if err != nil {
-		messageTypeName := "Ping"
-		if messageType == websocket.PongMessage {
-			messageTypeName = "Pong"
-		}
-		log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, group: %s",
-			messageTypeName, wh.conn.RemoteAddr(), wh.watcher.Subject(), wh.watcher.Group())
-		//wh.watcher.SetError(err)
-		return err
-	}
-	return nil
+	wh.SetIdle()
+
+	log.Debugf("start watching instance status, subscriber[%s], consumer: %s",
+		wh.RemoteAddr, wh.ConsumerID)
 }
 
-func (wh *WebSocket) HandleControlMessage() {
-	remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+	remoteAddr := wh.RemoteAddr
 	// PING
-	wh.conn.SetPingHandler(func(message string) error {
+	wh.Conn.SetPingHandler(func(message string) error {
 		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+			err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 			if err != nil {
 				log.Error("", err)
 			}
 		}()
-		if wh.needPingWatcher {
-			log.Infof("received 'Ping' message '%s' from watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
-				message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		if wh.needPing {
+			log.Infof("received 'Ping' message '%s' from subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+				message, remoteAddr, wh.ConsumerID)
 		}
-		wh.needPingWatcher = false
-		return wh.Heartbeat(websocket.PongMessage)
+		wh.needPing = false
+		return wh.WritePingPong(websocket.PongMessage)
 	})
 	// PONG
-	wh.conn.SetPongHandler(func(message string) error {
+	wh.Conn.SetPongHandler(func(message string) error {
 		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+			err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 			if err != nil {
 				log.Error("", err)
 			}
 		}()
-		log.Debugf("received 'Pong' message '%s' from watcher[%s], subject: %s, group: %s",
-			message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		log.Debugf("received 'Pong' message '%s' from subscriber[%s], consumer: %s",
+			message, remoteAddr, wh.ConsumerID)
 		return nil
 	})
 	// CLOSE
-	wh.conn.SetCloseHandler(func(code int, text string) error {
-		log.Infof("watcher[%s] active closed, code: %d, message: '%s', subject: %s, group: %s",
-			remoteAddr, code, text, wh.watcher.Subject(), wh.watcher.Group())
+	wh.Conn.SetCloseHandler(func(code int, text string) error {
+		log.Infof("subscriber[%s] active closed, code: %d, message: '%s', consumer: %s",
+			remoteAddr, code, text, wh.ConsumerID)
 		return wh.sendClose(code, text)
 	})
+}
 
-	wh.conn.SetReadLimit(connection.ReadMaxBody)
-	err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+func (wh *WebSocket) ReadMessage() error {
+	wh.Conn.SetReadLimit(connection.ReadMaxBody)
+	err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 	if err != nil {
 		log.Error("", err)
 	}
 	for {
-		_, _, err := wh.conn.ReadMessage()
+		_, _, err := wh.Conn.ReadMessage()
 		if err != nil {
-			// client close or conn broken
-			wh.watcher.SetError(err)
-			return
+			return err
 		}
 	}
 }
 
 func (wh *WebSocket) sendClose(code int, text string) error {
-	remoteAddr := wh.conn.RemoteAddr().String()
+	remoteAddr := wh.Conn.RemoteAddr().String()
 	var message []byte
 	if code != websocket.CloseNoStatusReceived {
 		message = websocket.FormatCloseMessage(code, text)
 	}
-	err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout()))
+	err := wh.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
 	if err != nil {
-		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+			remoteAddr, wh.ConsumerID)
 		return err
 	}
 	return nil
 }
 
-// Pick will be called by publisher
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
 	select {
-	case <-wh.Ready():
-		if wh.watcher.Err() != nil {
-			return wh.watcher.Err()
-		}
-
+	case <-wh.Idle():
 		select {
 		case t := <-wh.ticker.C:
 			return t
-		case j := <-wh.watcher.Job:
-			if j == nil {
-				return fmt.Errorf("server shutdown")
-			}
-			return j
 		default:
-			// reset if idle
-			wh.SetReady()
+			// reset if idleCh
+			wh.SetIdle()
 		}
 	default:
 	}
-
 	return nil
 }
 
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
-	defer wh.SetReady()
-
-	var (
-		message    []byte
-		remoteAddr = wh.conn.RemoteAddr().String()
-	)
-
-	switch o := o.(type) {
-	case error:
-		log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-
-		message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error()))
-	case time.Time:
-		if exist, err := datasource.Instance().ExistServiceByID(wh.ctx, &pb.GetExistenceByIDRequest{
-			ServiceId: wh.watcher.Group(),
-		}); err != nil || !exist.Exist {
-			message = util.StringToBytesWithNoCopy("Service does not exit.")
-			break
-		}
-
-		if !wh.needPingWatcher {
-			return
-		}
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+	defer wh.SetIdle()
 
-		if err := wh.Heartbeat(websocket.PingMessage); err != nil {
-			log.Errorf(err, "send 'Ping' message to watcher[%s] failed, subject: %s, group: %s",
-				remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-			return
-		}
-
-		log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-		return
-	case *event.InstanceEvent:
-		resp := o.Response
+	if !wh.needPing {
+		return nil
+	}
 
-		providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
-		if resp.Action != string(pb.EVT_EXPIRE) {
-			providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
-		}
-		log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, group: %s",
-			resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), wh.watcher.Group())
+	ctx = util.SetDomainProjectString(ctx, wh.DomainProject)
 
-		resp.Response = nil
-		data, err := json.Marshal(resp)
-		if err != nil {
-			log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
-				remoteAddr, providerFlag, o, wh.watcher.Subject(), wh.watcher.Group())
-			message = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
-			break
-		}
-		message = data
-	default:
-		log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, group: %s",
-			remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
-		return
+	if exist, err := datasource.Instance().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
+		ServiceId: wh.ConsumerID,
+	}); err != nil || !exist.Exist {
+		return errServiceNotExist
 	}
 
-	select {
-	case <-wh.closed:
-		return
-	default:
+	remoteAddr := wh.Conn.RemoteAddr().String()
+	if err := wh.WritePingPong(websocket.PingMessage); err != nil {
+		return err
 	}
 
-	err := wh.WriteMessage(message)
-	if evt, ok := o.(*event.InstanceEvent); ok {
-		metrics.ReportPublishCompleted(evt, err)
-	}
-	if err != nil {
-		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-	}
+	log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+		remoteAddr, wh.ConsumerID)
+	return nil
+}
+
+func (wh *WebSocket) WritePingPong(messageType int) error {
+	return wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
 }
 
-func (wh *WebSocket) WriteMessage(message []byte) error {
-	err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+	err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
 	if err != nil {
 		return err
 	}
-	return wh.conn.WriteMessage(websocket.TextMessage, message)
+	err = wh.Conn.WriteMessage(websocket.TextMessage, message)
+	if err != nil {
+		log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+			wh.Conn.RemoteAddr().String(), len(message))
+	}
+	return err
 }
 
-func (wh *WebSocket) Ready() <-chan struct{} {
-	return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+	return wh.idleCh
 }
 
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
 	select {
-	case wh.free <- struct{}{}:
+	case wh.idleCh <- struct{}{}:
 	default:
 	}
 }
 
-func (wh *WebSocket) Stop() {
-	close(wh.closed)
-}
-
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
-	domainProject := util.ParseDomainProject(ctx)
-	domain := util.ParseDomain(ctx)
-	socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, domainProject, f))
-
-	metrics.ReportSubscriber(domain, Websocket, 1)
-	process(socket)
-	metrics.ReportSubscriber(domain, Websocket, -1)
-}
-
-func process(socket *WebSocket) {
-	if err := socket.Init(); err != nil {
-		return
-	}
-
-	socket.HandleControlMessage()
-
-	socket.Stop()
-}
-
-func SendEstablishError(conn *websocket.Conn, err error) {
-	remoteAddr := conn.RemoteAddr().String()
-	log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
-	if err := conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error())); err != nil {
-		log.Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
-	}
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceEventListWatcher) *WebSocket {
-	return &WebSocket{
-		ctx:     ctx,
-		conn:    conn,
-		watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) *WebSocket {
+	ws := &WebSocket{
+		Options:       ToOptions(),
+		DomainProject: domainProject,
+		ConsumerID:    serviceID,
+		Conn:          conn,
 	}
+	ws.Init()
+	return ws
 }
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 7f21727..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,12 +16,10 @@
  */
 package ws_test
 
-// initialize
 import (
 	_ "github.com/apache/servicecomb-service-center/test"
 
 	"context"
-	"errors"
 	"net/http"
 	"net/http/httptest"
 	"strings"
@@ -31,104 +29,136 @@ import (
 	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
 	"github.com/apache/servicecomb-service-center/server/core"
 	"github.com/apache/servicecomb-service-center/server/event"
-	"github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
+	"github.com/stretchr/testify/assert"
 )
 
 var closeCh = make(chan struct{})
 
-type watcherConn struct {
-}
-
 func init() {
 	testing.Init()
 	core.Initialize()
 }
+
+type watcherConn struct {
+	ClientConn *websocket.Conn
+	ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+	s := httptest.NewServer(h)
+	h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+		strings.Replace(s.URL, "http://", "ws://", 1), nil)
+}
+
 func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	var upgrader = websocket.Upgrader{}
-	conn, _ := upgrader.Upgrade(w, r, nil)
+	h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
 	for {
-		conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
-		conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
-		_, _, err := conn.ReadMessage()
+		//h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+		//h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
+		_, _, err := h.ServerConn.ReadMessage()
 		if err != nil {
 			return
 		}
 		<-closeCh
-		conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
-		conn.Close()
+		h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+		h.ServerConn.Close()
 		return
 	}
 }
 
-func TestDoWebSocketListAndWatch(t *testing.T) {
-	s := httptest.NewServer(&watcherConn{})
-
-	conn, _, _ := websocket.DefaultDialer.Dial(
-		strings.Replace(s.URL, "http://", "ws://", 1), nil)
-
-	wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+	ts := &watcherConn{}
+	ts.Test()
+	return ts
+}
 
-	w := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
-		results = append(results, &discovery.WatchInstanceResponse{
-			Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
-			Action:   string(discovery.EVT_CREATE),
-			Key:      &discovery.MicroServiceKey{},
-			Instance: &discovery.MicroServiceInstance{},
-		})
-		return
+func TestNewWebSocket(t *testing.T) {
+	mock := NewTest()
+	t.Run("should return not nil when new", func(t *testing.T) {
+		assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
 	})
+}
 
-	ws := wss.New(context.Background(), conn, w)
-	err := ws.Init()
-	if err != nil {
-		t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+	mock := NewTest()
+	conn := mock.ServerConn
+	options := wss.ToOptions()
+	webSocket := &wss.WebSocket{
+		Options:       options,
+		DomainProject: "default",
+		ConsumerID:    "",
+		Conn:          conn,
 	}
 
-	event.Center().Start()
-
-	go func() {
-		wss.ListAndWatch(context.Background(), "", nil, conn)
-
-		w2 := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
-			return
-		})
-		ws2 := wss.New(context.Background(), conn, w2)
-		err := ws2.Init()
-		if err != nil {
-			t.Fatalf("TestPublisher_Run")
-		}
-	}()
-
-	go ws.HandleControlMessage()
-
-	w.OnMessage(nil)
-	w.OnMessage(&event.InstanceEvent{})
-
-	event.Center().Fire(event.NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
-		Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
-		Action:   string(discovery.EVT_CREATE),
-		Key:      &discovery.MicroServiceKey{},
-		Instance: &discovery.MicroServiceInstance{},
-	}))
-
-	<-time.After(time.Second)
-
-	ws.HandleEvent(nil)
-
-	ws.Heartbeat(websocket.PingMessage)
-	ws.Heartbeat(websocket.PongMessage)
-
-	ws.HandleEvent(time.Now())
+	t.Run("should not check when new", func(t *testing.T) {
+		webSocket.HealthInterval = time.Second
+		webSocket.Init()
+		assert.Nil(t, webSocket.NeedCheck())
+	})
 
-	closeCh <- struct{}{}
+	t.Run("should check when check time up", func(t *testing.T) {
+		webSocket.HealthInterval = time.Microsecond
+		webSocket.Init()
+		<-time.After(time.Microsecond)
+		assert.NotNil(t, webSocket.NeedCheck())
+	})
+	t.Run("should not check when busy", func(t *testing.T) {
+		webSocket.HealthInterval = time.Microsecond
+		webSocket.Init()
+		<-time.After(time.Microsecond)
+		assert.NotNil(t, webSocket.NeedCheck())
+		assert.Nil(t, webSocket.NeedCheck())
+	})
+}
 
-	<-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+	mock := NewTest()
+	webSocket := wss.NewWebSocket("", "", mock.ServerConn)
 
-	ws.Heartbeat(websocket.PingMessage)
-	ws.Heartbeat(websocket.PongMessage)
+	t.Run("should idle when new", func(t *testing.T) {
+		select {
+		case <-webSocket.Idle():
+		default:
+			assert.Fail(t, "not idle")
+		}
+	})
+	t.Run("should idle when setIdle", func(t *testing.T) {
+		select {
+		case <-webSocket.Idle():
+			assert.Fail(t, "idle")
+		default:
+			webSocket.SetIdle()
+			select {
+			case <-webSocket.Idle():
+			default:
+				assert.Fail(t, "not idle")
+			}
+		}
+	})
+	t.Run("should idle when checkHealth", func(t *testing.T) {
+		_ = webSocket.CheckHealth(context.Background())
+		select {
+		case <-webSocket.Idle():
+		default:
+			assert.Fail(t, "not idle")
+		}
+	})
+}
 
-	w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+	mock := NewTest()
+	event.Center().Start()
 
-	wss.Instance().Stop()
+	t.Run("should do nothing when recv PING", func(t *testing.T) {
+		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+		<-time.After(time.Second)
+		assert.Nil(t, ws.CheckHealth(context.Background()))
+	})
+	t.Run("should return err when consumer not exist", func(t *testing.T) {
+		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+	})
 }
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
new file mode 100644
index 0000000..74f30fc
--- /dev/null
+++ b/server/event/instance_event.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/event"
+	simple "github.com/apache/servicecomb-service-center/pkg/time"
+	pb "github.com/go-chassis/cari/discovery"
+)
+
+const QueueSize = 5000
+
+var INSTANCE = event.RegisterType("INSTANCE", QueueSize)
+
+// 状态变化推送
+type InstanceEvent struct {
+	event.Event
+	Revision int64
+	Response *pb.WatchInstanceResponse
+}
+
+func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+	return &InstanceEvent{
+		Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
+		Revision: rev,
+		Response: response,
+	}
+}
+
+func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+	return &InstanceEvent{
+		Event:    event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+		Revision: rev,
+		Response: response,
+	}
+}
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index f61f622..9d5c1c4 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,39 +18,20 @@
 package event
 
 import (
-	"context"
-	"time"
-
+	"fmt"
 	"github.com/apache/servicecomb-service-center/pkg/event"
-	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	simple "github.com/apache/servicecomb-service-center/pkg/time"
-	pb "github.com/go-chassis/cari/discovery"
-)
-
-const (
-	AddJobTimeout  = 1 * time.Second
-	EventQueueSize = 5000
+	"github.com/apache/servicecomb-service-center/server/metrics"
 )
 
-var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
-
-// 状态变化推送
-type InstanceEvent struct {
-	event.Event
-	Revision int64
-	Response *pb.WatchInstanceResponse
-}
+var errBusy = fmt.Errorf("too busy")
 
-type InstanceEventListWatcher struct {
+type InstanceSubscriber struct {
 	event.Subscriber
-	Job          chan *InstanceEvent
-	ListRevision int64
-	ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
-	listCh       chan struct{}
+	Job chan *InstanceEvent
 }
 
-func (w *InstanceEventListWatcher) SetError(err error) {
+func (w *InstanceSubscriber) SetError(err error) {
 	w.Subscriber.SetError(err)
 	// 触发清理job
 	e := w.Bus().Fire(event.NewUnhealthyEvent(w))
@@ -59,108 +40,64 @@ func (w *InstanceEventListWatcher) SetError(err error) {
 	}
 }
 
-func (w *InstanceEventListWatcher) OnAccept() {
+func (w *InstanceSubscriber) OnAccept() {
 	if w.Err() != nil {
 		return
 	}
 	log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
-	gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
-	defer close(w.listCh)
-	if w.ListFunc == nil {
-		return
-	}
-	results, rev := w.ListFunc()
-	w.ListRevision = rev
-	for _, response := range results {
-		w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), w.ListRevision, response))
-	}
 }
 
 //被通知
-func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
 	if w.Err() != nil {
 		return
 	}
 
-	wJob, ok := job.(*InstanceEvent)
+	wJob, ok := evt.(*InstanceEvent)
 	if !ok {
 		return
 	}
-
-	select {
-	case <-w.listCh:
-	default:
-		timer := time.NewTimer(w.Timeout())
-		select {
-		case <-w.listCh:
-			timer.Stop()
-		case <-timer.C:
-			log.Errorf(nil,
-				"the %s listwatcher %s %s is not ready[over %s], send the event %v",
-				w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
-		}
-	}
-
-	// the negative revision is specially for mongo scene,should be removed after mongo support revison.
-	if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
-		log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
-			w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
-		return
-	}
 	w.sendMessage(wJob)
 }
 
-func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
 	defer log.Recover()
+
+	metrics.ReportPendingCompleted(evt)
+
 	select {
-	case w.Job <- job:
+	case w.Job <- evt:
 	default:
-		timer := time.NewTimer(w.Timeout())
-		select {
-		case w.Job <- job:
-			timer.Stop()
-		case <-timer.C:
-			log.Errorf(nil,
-				"the %s watcher %s %s event queue is full[over %s], drop the event %v",
-				w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
-		}
+		log.Errorf(nil, "the %s watcher %s %s event queue is full, drop the blocked events",
+			w.Type(), w.Group(), w.Subject())
+		w.cleanup()
+		w.Job <- evt
 	}
 }
 
-func (w *InstanceEventListWatcher) Timeout() time.Duration {
-	return AddJobTimeout
-}
-
-func (w *InstanceEventListWatcher) Close() {
-	close(w.Job)
-}
-
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
-	return &InstanceEvent{
-		Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
-		Revision: rev,
-		Response: response,
+func (w *InstanceSubscriber) cleanup() {
+	for {
+		select {
+		case evt, ok := <-w.Job:
+			if !ok {
+				return
+			}
+			metrics.ReportPublishCompleted(evt, errBusy)
+		default:
+			return
+		}
 	}
 }
 
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
-	return &InstanceEvent{
-		Event:    event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
-		Revision: rev,
-		Response: response,
-	}
+func (w *InstanceSubscriber) Close() {
+	w.cleanup()
+	close(w.Job)
 }
 
-func NewInstanceEventListWatcher(serviceID, domainProject string,
-	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
-	watcher := &InstanceEventListWatcher{
+func NewInstanceSubscriber(serviceID, domainProject string) *InstanceSubscriber {
+	watcher := &InstanceSubscriber{
 		Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
 		Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
-		ListFunc:   listFunc,
-		listCh:     make(chan struct{}),
 	}
 	return watcher
 }
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index aea7c97..c012980 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -44,6 +44,23 @@ var (
 			Objectives: metrics.Pxx,
 		}, []string{"instance", "source", "status"})
 
+	pendingGauge = helper.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Namespace: metrics.FamilyName,
+			Subsystem: "notify",
+			Name:      "pending_total",
+			Help:      "Counter of pending instance events",
+		}, []string{"instance", "source"})
+
+	pendingLatency = helper.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Namespace:  metrics.FamilyName,
+			Subsystem:  "notify",
+			Name:       "pending_durations_microseconds",
+			Help:       "Latency of pending instance events",
+			Objectives: metrics.Pxx,
+		}, []string{"instance", "source"})
+
 	subscriberGauge = helper.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
@@ -62,10 +79,17 @@ func ReportPublishCompleted(evt event.Event, err error) {
 	}
 	notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
 	notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
+	pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
 }
 
-func ReportSubscriber(domain, scheme string, n float64) {
+func ReportPendingCompleted(evt event.Event) {
 	instance := metrics.InstanceName()
+	elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
+	pendingLatency.WithLabelValues(instance, evt.Type().String()).Observe(elapsed)
+	pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
+}
 
+func ReportSubscriber(domain, scheme string, n float64) {
+	instance := metrics.InstanceName()
 	subscriberGauge.WithLabelValues(instance, domain, scheme).Add(n)
 }
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
 func (this *WatchService) URLPatterns() []rest.Route {
 	return []rest.Route{
 		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
 	}
 }
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index e6de6f3..9396ee5 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -34,7 +34,6 @@ type WatchService struct {
 func (s *WatchService) URLPatterns() []rest.Route {
 	return []rest.Route{
 		{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
-		{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/listwatcher", Func: s.ListAndWatch},
 	}
 }
 
@@ -63,16 +62,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
 		SelfServiceId: r.URL.Query().Get(":serviceId"),
 	}, conn)
 }
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
-	conn, err := upgrade(w, r)
-	if err != nil {
-		return
-	}
-	defer conn.Close()
-
-	r.Method = "WATCHLIST"
-	core.InstanceAPI.WebSocketListAndWatch(r.Context(), &pb.WatchInstanceRequest{
-		SelfServiceId: r.URL.Query().Get(":serviceId"),
-	}, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index b3c00df..8813b0f 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -54,7 +54,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream proto.Servic
 		return err
 	}
 
-	return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
+	return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -63,18 +63,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
 		ws.SendEstablishError(conn, err)
 		return
 	}
-	ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
-	log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId)
-	if err := s.WatchPreOpera(ctx, in); err != nil {
-		ws.SendEstablishError(conn, err)
-		return
-	}
-	ws.ListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
-		return s.QueryAllProvidersInstances(ctx, in)
-	}, conn)
+	ws.Watch(ctx, in.SelfServiceId, conn)
 }
 
 func (s *InstanceService) QueryAllProvidersInstances(ctx context.Context, in *pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index b663cc8..a058399 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
 	instanceResource.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
 }
 
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
-	defer func() {
-		recover()
-	}()
-	instanceResource.WebSocketListAndWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
-}
-
 var _ = Describe("'Instance' service", func() {
 	Describe("execute 'watch' operartion", func() {
 		var (