You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/17 12:00:20 UTC

[servicecomb-service-center] branch v1.x updated: SCB-2176 Refactor websocket (#981)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new db013af  SCB-2176 Refactor websocket (#981)
db013af is described below

commit db013afc9c9d1673999190b9ceb967a08e673813
Author: little-cui <su...@qq.com>
AuthorDate: Mon May 17 20:00:11 2021 +0800

    SCB-2176 Refactor websocket (#981)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
---
 integration/apis.go                                |   1 -
 integration/instances_test.go                      |  12 -
 server/connection/grpc/stream.go                   |   5 +-
 server/connection/grpc/stream_test.go              |   4 +-
 server/connection/ws/broker.go                     |  81 +++++++
 .../connection/ws/{options.go => broker_test.go}   |  30 ++-
 server/connection/ws/common.go                     |  33 ++-
 .../services.go => connection/ws/common_test.go}   |  35 ++-
 .../ws/{keepalive.go => health_check.go}           |  90 ++++----
 .../ws/{options.go => health_check_test.go}        |  29 ++-
 server/connection/ws/options.go                    |  10 +-
 server/connection/ws/websocket.go                  | 244 +++++++--------------
 server/connection/ws/websocket_test.go             | 187 +++++++++-------
 server/core/proto/services.go                      |   1 -
 server/event/instance_subscriber.go                |  46 +---
 server/rest/controller/v3/instance_watcher.go      |   1 -
 server/rest/controller/v4/instance_watcher.go      |  14 --
 server/service/watch.go                            |  15 +-
 server/service/watch_test.go                       |   7 -
 19 files changed, 412 insertions(+), 433 deletions(-)

diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = "/v4/default/registry/microservices/:serviceId/inst
 var UPDATEINSTANCESTATUS = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
 var INSTANCEHEARTBEAT = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
 var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = "/v4/default/registry/microservices/:serviceId/listwatcher"
 
 //Governance API's
 var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 0f90d31..b84fbc4 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -599,18 +599,6 @@ var _ = Describe("MicroService Api Test", func() {
 
 				Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
 			})
-			It("Call the listwatcher API ", func() {
-				//This api gives 400 bad request for the integration test
-				// as integration test is not able to make ws connection
-				url := strings.Replace(INSTANCELISTWATCHER, ":serviceId", serviceId, 1)
-				req, _ := http.NewRequest(GET, SCURL+url, nil)
-				req.Header.Set("X-Domain-Name", "default")
-				resp, err := scclient.Do(req)
-				Expect(err).To(BeNil())
-				defer resp.Body.Close()
-
-				Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
-			})
 		})
 	})
 
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index 0609bdb..0e5415e 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"errors"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
 	"github.com/apache/servicecomb-service-center/server/core/proto"
@@ -67,10 +66,10 @@ func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrl_
 	}
 }
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrl_WatchServer) (err error) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	watcher := event.NewInstanceSubscriber(serviceID, domainProject, f)
+	watcher := event.NewInstanceSubscriber(serviceID, domainProject)
 	err = event.Center().AddSubscriber(watcher)
 	if err != nil {
 		return
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 6ee8eb3..c549b65 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -41,7 +41,7 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-	w := event.NewInstanceSubscriber("g", "s", nil)
+	w := event.NewInstanceSubscriber("g", "s")
 	w.Job <- nil
 	err := stream.Handle(w, &grpcWatchServer{})
 	if err == nil {
@@ -54,6 +54,6 @@ func TestHandleWatchJob(t *testing.T) {
 
 func TestDoStreamListAndWatch(t *testing.T) {
 	defer log.Recover()
-	err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+	err := stream.Watch(context.Background(), "s", nil)
 	t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..e4901ff
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	pb "github.com/apache/servicecomb-service-center/pkg/registry"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/event"
+)
+
+type Broker struct {
+	consumer *WebSocket
+	producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case instanceEvent, ok := <-b.producer.Job:
+			if !ok {
+				return fmt.Errorf("read producer[%v] event failed", b.producer.Group())
+			}
+			err := b.write(instanceEvent)
+			if err != nil {
+				log.Errorf(err, "write instance event to subscriber[%s] failed, group: %s",
+					b.consumer.RemoteAddr, b.producer.Group())
+				return err
+			}
+		}
+	}
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+	resp := evt.Response
+	providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
+	if resp.Action != string(pb.EVT_EXPIRE) {
+		providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+	}
+	remoteAddr := b.consumer.Conn.RemoteAddr().String()
+	log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+		resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+	resp.Response = nil
+	data, err := json.Marshal(resp)
+	if err != nil {
+		log.Errorf(err, "subscriber[%s] watch %s, group: %s",
+			remoteAddr, providerFlag, b.producer.Group())
+		data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
+	}
+	err = b.consumer.WriteTextMessage(data)
+	connection.ReportPublishCompleted(evt, err)
+	return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+	return &Broker{
+		consumer: ws,
+		producer: is,
+	}
+}
diff --git a/server/connection/ws/options.go b/server/connection/ws/broker_test.go
similarity index 59%
copy from server/connection/ws/options.go
copy to server/connection/ws/broker_test.go
index 5817ef0..a3597c3 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package ws
+package ws_test
 
 import (
-	"time"
-
-	"github.com/apache/servicecomb-service-center/server/connection"
+	"context"
+	"github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/stretchr/testify/assert"
+	"testing"
 )
 
-type Options struct {
-	ReadTimeout time.Duration
-	SendTimeout time.Duration
+func TestNewBroker(t *testing.T) {
+	t.Run("should not return nil when new broker", func(t *testing.T) {
+		assert.NotNil(t, ws.NewBroker(nil, nil))
+
+	})
 }
 
-func ToOptions() Options {
-	return Options{
-		ReadTimeout: connection.ReadTimeout,
-		SendTimeout: connection.SendTimeout,
-	}
+func TestBroker_Listen(t *testing.T) {
+	t.Run("should return err when listen context cancelled", func(t *testing.T) {
+		broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+		ctx, cancel := context.WithCancel(context.Background())
+		cancel()
+		assert.Equal(t, context.Canceled, broker.Listen(ctx))
+	})
 }
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
index 15a060d..0132202 100644
--- a/server/connection/ws/common.go
+++ b/server/connection/ws/common.go
@@ -19,32 +19,43 @@ package ws
 
 import (
 	"context"
+	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
 	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/gorilla/websocket"
 )
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*registry.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	socket := New(ctx, conn, event.NewInstanceSubscriber(serviceID, domainProject, f))
 
-	connection.ReportSubscriber(domain, Websocket, 1)
-	process(socket)
-	connection.ReportSubscriber(domain, Websocket, -1)
-}
+	ws := NewWebSocket(domainProject, serviceID, conn)
+	HealthChecker().Accept(ws)
 
-func process(socket *WebSocket) {
-	if err := socket.Init(); err != nil {
+	subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+	err := event.Center().AddSubscriber(subscriber)
+	if err != nil {
+		SendEstablishError(conn, err)
 		return
 	}
 
-	socket.HandleControlMessage()
+	connection.ReportSubscriber(domain, Websocket, 1)
+	defer connection.ReportSubscriber(domain, Websocket, -1)
 
-	socket.Stop()
+	pool := gopool.New(ctx).Do(func(ctx context.Context) {
+		if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+			log.Error(fmt.Sprintf("[%s] listen service[%s] failed", conn.RemoteAddr(), serviceID), err)
+		}
+	})
+	defer pool.Done()
+
+	if err := ws.ReadMessage(); err != nil {
+		log.Error(fmt.Sprintf("[%s] handle service[%s] control message failed", conn.RemoteAddr(), serviceID), err)
+		subscriber.SetError(err)
+	}
 }
 
 func SendEstablishError(conn *websocket.Conn, err error) {
diff --git a/server/core/proto/services.go b/server/connection/ws/common_test.go
similarity index 53%
copy from server/core/proto/services.go
copy to server/connection/ws/common_test.go
index 12f6af4..df8ae55 100644
--- a/server/core/proto/services.go
+++ b/server/connection/ws/common_test.go
@@ -14,21 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package proto
+
+package ws_test
 
 import (
 	"context"
-	"github.com/apache/servicecomb-service-center/pkg/registry"
-	"github.com/gorilla/websocket"
-)
-
-type ServiceInstanceCtrlServerEx interface {
-	ServiceInstanceCtrlServer
+	"errors"
+	"testing"
 
-	BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) (*registry.BatchFindInstancesResponse, error)
+	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/stretchr/testify/assert"
+)
 
-	WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
-	WebSocketListAndWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+	mock := NewTest()
+	t.Run("should read the err when call", func(t *testing.T) {
+		wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+		_, message, err := mock.ClientConn.ReadMessage()
+		assert.Nil(t, err)
+		assert.Equal(t, "error", string(message))
+	})
+}
 
-	ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, error)
+func TestWatch(t *testing.T) {
+	t.Run("should return when ctx cancelled", func(t *testing.T) {
+		mock := NewTest()
+		mock.ServerConn.Close()
+		ctx, cancel := context.WithCancel(context.Background())
+		cancel()
+		wss.Watch(ctx, "", mock.ServerConn)
+	})
 }
diff --git a/server/connection/ws/keepalive.go b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/keepalive.go
rename to server/connection/ws/health_check.go
index 3bd7e8b..aeb4f38 100644
--- a/server/connection/ws/keepalive.go
+++ b/server/connection/ws/health_check.go
@@ -19,39 +19,36 @@ package ws
 
 import (
 	"context"
-	"github.com/apache/servicecomb-service-center/pkg/gopool"
+	"fmt"
 	"sync"
 	"time"
+
+	"github.com/apache/servicecomb-service-center/pkg/gopool"
+	"github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-var runner *KeepaliveRunner
+var checker *HealthCheck
 
 func init() {
-	runner = NewRunner()
-	runner.Run()
+	checker = NewHealthCheck()
+	checker.Run()
 }
 
-type KeepaliveRunner struct {
+type HealthCheck struct {
 	wss       []*WebSocket
 	lock      sync.Mutex
 	goroutine *gopool.Pool
 }
 
-func (wh *KeepaliveRunner) Run() {
-	gopool.Go(runner.loop)
+func (wh *HealthCheck) Run() {
+	gopool.Go(checker.loop)
 }
 
-func (wh *KeepaliveRunner) Stop() {
+func (wh *HealthCheck) Stop() {
 	wh.goroutine.Close(true)
 }
 
-func (wh *KeepaliveRunner) dispatch(ws *WebSocket, payload interface{}) {
-	wh.goroutine.Do(func(ctx context.Context) {
-		ws.HandleEvent(payload)
-	})
-}
-
-func (wh *KeepaliveRunner) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
 	defer wh.Stop()
 	ticker := time.NewTicker(500 * time.Millisecond)
 	for {
@@ -60,49 +57,52 @@ func (wh *KeepaliveRunner) loop(ctx context.Context) {
 			// server shutdown
 			return
 		case <-ticker.C:
-			var removes []int
-			for i, ws := range wh.wss {
-				if payload := ws.Pick(); payload != nil {
-					if _, ok := payload.(error); ok {
-						removes = append(removes, i)
-					}
-					wh.dispatch(ws, payload)
+			for _, ws := range wh.wss {
+				if t := ws.NeedCheck(); t == nil {
+					continue
 				}
+				wh.check(ws)
 			}
-			if len(removes) == 0 {
-				continue
-			}
-
-			wh.lock.Lock()
-			var (
-				news []*WebSocket
-				s    int
-			)
-			for _, e := range removes {
-				news = append(news, wh.wss[s:e]...)
-				s = e + 1
-			}
-			if s < len(wh.wss) {
-				news = append(news, wh.wss[s:]...)
-			}
-			wh.wss = news
-			wh.lock.Unlock()
 		}
 	}
 }
 
-func (wh *KeepaliveRunner) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+	wh.goroutine.Do(func(ctx context.Context) {
+		if err := ws.CheckHealth(ctx); err != nil {
+			wh.Remove(ws)
+			log.Error(fmt.Sprintf("checker removed unhealth websocket[%s]", ws.RemoteAddr), err)
+		}
+	})
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
 	wh.lock.Lock()
 	wh.wss = append(wh.wss, ws)
+	n := len(wh.wss)
+	wh.lock.Unlock()
+	return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+	wh.lock.Lock()
+	for i, t := range wh.wss {
+		if t == ws {
+			wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+			break
+		}
+	}
+	n := len(wh.wss)
 	wh.lock.Unlock()
+	return n
 }
 
-func NewRunner() *KeepaliveRunner {
-	return &KeepaliveRunner{
+func NewHealthCheck() *HealthCheck {
+	return &HealthCheck{
 		goroutine: gopool.New(context.Background()),
 	}
 }
 
-func Runner() *KeepaliveRunner {
-	return runner
+func HealthChecker() *HealthCheck {
+	return checker
 }
diff --git a/server/connection/ws/options.go b/server/connection/ws/health_check_test.go
similarity index 62%
copy from server/connection/ws/options.go
copy to server/connection/ws/health_check_test.go
index 5817ef0..7ab0578 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/health_check_test.go
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package ws
+package ws_test
 
 import (
-	"time"
-
-	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/stretchr/testify/assert"
+	"testing"
 )
 
-type Options struct {
-	ReadTimeout time.Duration
-	SendTimeout time.Duration
+func TestNewHealthCheck(t *testing.T) {
+	t.Run("should not return nil when new", func(t *testing.T) {
+		assert.NotNil(t, ws.NewHealthCheck())
+	})
 }
 
-func ToOptions() Options {
-	return Options{
-		ReadTimeout: connection.ReadTimeout,
-		SendTimeout: connection.SendTimeout,
-	}
+func TestHealthCheck_Run(t *testing.T) {
+	mock := NewTest()
+
+	t.Run("should return 1 when accept one ws", func(t *testing.T) {
+		check := ws.NewHealthCheck()
+		webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+		assert.Equal(t, 1, check.Accept(webSocket))
+		assert.Equal(t, 0, check.Remove(webSocket))
+	})
 }
diff --git a/server/connection/ws/options.go b/server/connection/ws/options.go
index 5817ef0..3196623 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/options.go
@@ -24,13 +24,15 @@ import (
 )
 
 type Options struct {
-	ReadTimeout time.Duration
-	SendTimeout time.Duration
+	ReadTimeout    time.Duration
+	SendTimeout    time.Duration
+	HealthInterval time.Duration
 }
 
 func ToOptions() Options {
 	return Options{
-		ReadTimeout: connection.ReadTimeout,
-		SendTimeout: connection.SendTimeout,
+		ReadTimeout:    connection.ReadTimeout,
+		SendTimeout:    connection.SendTimeout,
+		HealthInterval: connection.HeartbeatInterval,
 	}
 }
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index fc6db9c..24a7e2b 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,13 +19,9 @@ package ws
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	pb "github.com/apache/servicecomb-service-center/pkg/registry"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
-	"github.com/apache/servicecomb-service-center/server/event"
 	serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
 	"github.com/gorilla/websocket"
 	"time"
@@ -35,258 +31,182 @@ const Websocket = "Websocket"
 
 type WebSocket struct {
 	Options
-
-	ctx    context.Context
-	ticker *time.Ticker
-	conn   *websocket.Conn
-	// watcher subscribe the notification service event
-	watcher         *event.InstanceSubscriber
-	needPingWatcher bool
-	free            chan struct{}
-	closed          chan struct{}
+	Conn          *websocket.Conn
+	RemoteAddr    string
+	DomainProject string
+	ConsumerID    string
+
+	ticker   *time.Ticker
+	needPing bool
+	idleCh   chan struct{}
 }
 
-func (wh *WebSocket) Init() error {
-	wh.ticker = time.NewTicker(connection.HeartbeatInterval)
-	wh.needPingWatcher = true
-	wh.free = make(chan struct{}, 1)
-	wh.closed = make(chan struct{})
-
-	wh.SetReady()
+func (wh *WebSocket) Init() {
+	wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+	wh.ticker = time.NewTicker(wh.HealthInterval)
+	wh.needPing = true
+	wh.idleCh = make(chan struct{}, 1)
 
-	remoteAddr := wh.conn.RemoteAddr().String()
+	wh.registerMessageHandler()
 
-	// put in notification service queue
-	if err := event.Center().AddSubscriber(wh.watcher); err != nil {
-		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
-			remoteAddr, err.Error())
-		log.Errorf(nil, err.Error())
+	wh.SetIdle()
 
-		werr := wh.conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error()))
-		if werr != nil {
-			log.Errorf(werr, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
-		}
-		return err
-	}
-
-	// put in runner queue
-	Runner().Accept(wh)
-
-	log.Debugf("start watching instance status, watcher[%s], subject: %s, group: %s",
-		remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-	return nil
+	log.Debugf("start watching instance status, subscriber[%s], consumer: %s",
+		wh.RemoteAddr, wh.ConsumerID)
 }
 
-func (wh *WebSocket) HandleControlMessage() {
-	remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+	remoteAddr := wh.RemoteAddr
 	// PING
-	wh.conn.SetPingHandler(func(message string) error {
+	wh.Conn.SetPingHandler(func(message string) error {
 		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+			err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 			if err != nil {
 				log.Error("", err)
 			}
 		}()
-		if wh.needPingWatcher {
-			log.Infof("received 'Ping' message '%s' from watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
-				message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		if wh.needPing {
+			log.Infof("received 'Ping' message '%s' from subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+				message, remoteAddr, wh.ConsumerID)
 		}
-		wh.needPingWatcher = false
+		wh.needPing = false
 		return wh.WritePingPong(websocket.PongMessage)
 	})
 	// PONG
-	wh.conn.SetPongHandler(func(message string) error {
+	wh.Conn.SetPongHandler(func(message string) error {
 		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+			err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 			if err != nil {
 				log.Error("", err)
 			}
 		}()
-		log.Debugf("received 'Pong' message '%s' from watcher[%s], subject: %s, group: %s",
-			message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		log.Debugf("received 'Pong' message '%s' from subscriber[%s], consumer: %s",
+			message, remoteAddr, wh.ConsumerID)
 		return nil
 	})
 	// CLOSE
-	wh.conn.SetCloseHandler(func(code int, text string) error {
-		log.Infof("watcher[%s] active closed, code: %d, message: '%s', subject: %s, group: %s",
-			remoteAddr, code, text, wh.watcher.Subject(), wh.watcher.Group())
+	wh.Conn.SetCloseHandler(func(code int, text string) error {
+		log.Infof("subscriber[%s] active closed, code: %d, message: '%s', consumer: %s",
+			remoteAddr, code, text, wh.ConsumerID)
 		return wh.sendClose(code, text)
 	})
+}
 
-	wh.conn.SetReadLimit(connection.ReadMaxBody)
-	err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+func (wh *WebSocket) ReadMessage() error {
+	wh.Conn.SetReadLimit(connection.ReadMaxBody)
+	err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 	if err != nil {
 		log.Error("", err)
 	}
 	for {
-		_, _, err := wh.conn.ReadMessage()
+		_, _, err := wh.Conn.ReadMessage()
 		if err != nil {
-			// client close or conn broken
-			wh.watcher.SetError(err)
-			return
+			return err
 		}
 	}
 }
 
 func (wh *WebSocket) sendClose(code int, text string) error {
-	remoteAddr := wh.conn.RemoteAddr().String()
+	remoteAddr := wh.Conn.RemoteAddr().String()
 	var message []byte
 	if code != websocket.CloseNoStatusReceived {
 		message = websocket.FormatCloseMessage(code, text)
 	}
-	err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
+	err := wh.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
 	if err != nil {
-		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+		log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+			remoteAddr, wh.ConsumerID)
 		return err
 	}
 	return nil
 }
 
-// Pick will be called by runner
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
 	select {
-	case <-wh.Ready():
-		if wh.watcher.Err() != nil {
-			return wh.watcher.Err()
-		}
-
+	case <-wh.Idle():
 		select {
 		case t := <-wh.ticker.C:
 			return t
-		case j := <-wh.watcher.Job:
-			if j == nil {
-				return fmt.Errorf("server shutdown")
-			}
-			return j
 		default:
-			// reset if idle
-			wh.SetReady()
+			// reset if idleCh
+			wh.SetIdle()
 		}
 	default:
 	}
-
 	return nil
 }
 
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
-	defer wh.SetReady()
-
-	var remoteAddr = wh.conn.RemoteAddr().String()
-	switch o := o.(type) {
-	case error:
-		log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-		_ = wh.write(util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error())))
-	case time.Time:
-		wh.Keepalive()
-	case *event.InstanceEvent:
-		wh.WriteInstanceEvent(o)
-	default:
-		log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, group: %s",
-			remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
-	}
-}
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+	defer wh.SetIdle()
 
-func (wh *WebSocket) Keepalive() {
-	domainProject := util.ParseDomainProject(wh.ctx)
-	if !serviceUtil.ServiceExist(wh.ctx, domainProject, wh.watcher.Group()) {
-		_ = wh.write(util.StringToBytesWithNoCopy("Service does not exit."))
-		return
+	if !wh.needPing {
+		return nil
 	}
 
-	if !wh.needPingWatcher {
-		return
+	if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
+		return fmt.Errorf("Service does not exist.")
 	}
 
-	remoteAddr := wh.conn.RemoteAddr().String()
+	remoteAddr := wh.Conn.RemoteAddr().String()
 	if err := wh.WritePingPong(websocket.PingMessage); err != nil {
-		log.Errorf(err, "send 'Ping' message to watcher[%s] failed, subject: %s, group: %s",
-			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-		return
+		log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, consumer: %s",
+			remoteAddr, wh.ConsumerID)
+		return err
 	}
 
-	log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
-		remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-	return
+	log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+		remoteAddr, wh.ConsumerID)
+	return nil
 }
 
 func (wh *WebSocket) WritePingPong(messageType int) error {
-	err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
+	err := wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
 	if err != nil {
 		messageTypeName := "Ping"
 		if messageType == websocket.PongMessage {
 			messageTypeName = "Pong"
 		}
-		log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, group: %s",
-			messageTypeName, wh.conn.RemoteAddr(), wh.watcher.Subject(), wh.watcher.Group())
-		//wh.watcher.SetError(err)
+		log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: %s",
+			messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
+		//wh.subscriber.SetError(err)
 		return err
 	}
 	return nil
 }
 
-func (wh *WebSocket) WriteInstanceEvent(evt *event.InstanceEvent) {
-	resp := evt.Response
-	providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
-	if resp.Action != string(pb.EVT_EXPIRE) {
-		providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
-	}
-	remoteAddr := wh.conn.RemoteAddr().String()
-	log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, group: %s",
-		resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), wh.watcher.Group())
-
-	resp.Response = nil
-	data, err := json.Marshal(resp)
-	if err != nil {
-		log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
-			remoteAddr, providerFlag, evt, wh.watcher.Subject(), wh.watcher.Group())
-		data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
-	}
-	err = wh.write(data)
-	connection.ReportPublishCompleted(evt, err)
-}
-
-func (wh *WebSocket) write(message []byte) error {
-	select {
-	case <-wh.closed:
-		return nil
-	default:
-	}
-
-	err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+	err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
 	if err != nil {
 		return err
 	}
-	err = wh.conn.WriteMessage(websocket.TextMessage, message)
+	err = wh.Conn.WriteMessage(websocket.TextMessage, message)
 	if err != nil {
-		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
-			wh.conn.RemoteAddr().String(), wh.watcher.Subject(), wh.watcher.Group())
+		log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+			wh.Conn.RemoteAddr().String(), len(message))
 	}
 	return err
 }
 
-func (wh *WebSocket) Ready() <-chan struct{} {
-	return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+	return wh.idleCh
 }
 
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
 	select {
-	case wh.free <- struct{}{}:
+	case wh.idleCh <- struct{}{}:
 	default:
 	}
 }
 
-func (wh *WebSocket) Stop() {
-	close(wh.closed)
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceSubscriber) *WebSocket {
-	return &WebSocket{
-		Options: ToOptions(),
-		ctx:     ctx,
-		conn:    conn,
-		watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) *WebSocket {
+	ws := &WebSocket{
+		Options:       ToOptions(),
+		DomainProject: domainProject,
+		ConsumerID:    serviceID,
+		Conn:          conn,
 	}
+	ws.Init()
+	return ws
 }
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 8338b8f..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,118 +16,149 @@
  */
 package ws_test
 
-// initialize
-import _ "github.com/apache/servicecomb-service-center/test"
 import (
+	_ "github.com/apache/servicecomb-service-center/test"
+
 	"context"
-	"errors"
-	"github.com/apache/servicecomb-service-center/pkg/registry"
-	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
-	"github.com/apache/servicecomb-service-center/server/core"
-	"github.com/apache/servicecomb-service-center/server/core/proto"
-	"github.com/apache/servicecomb-service-center/server/event"
-	"github.com/gorilla/websocket"
 	"net/http"
 	"net/http/httptest"
 	"strings"
 	"testing"
 	"time"
+
+	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/core"
+	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/gorilla/websocket"
+	"github.com/stretchr/testify/assert"
 )
 
 var closeCh = make(chan struct{})
 
-type watcherConn struct {
-}
-
 func init() {
 	testing.Init()
 	core.Initialize()
 }
+
+type watcherConn struct {
+	ClientConn *websocket.Conn
+	ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+	s := httptest.NewServer(h)
+	h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+		strings.Replace(s.URL, "http://", "ws://", 1), nil)
+}
+
 func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	var upgrader = websocket.Upgrader{}
-	conn, _ := upgrader.Upgrade(w, r, nil)
+	h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
 	for {
-		conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
-		conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
-		_, _, err := conn.ReadMessage()
+		//h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+		//h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
+		_, _, err := h.ServerConn.ReadMessage()
 		if err != nil {
 			return
 		}
 		<-closeCh
-		conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
-		conn.Close()
+		h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+		h.ServerConn.Close()
 		return
 	}
 }
 
-func TestDoWebSocketListAndWatch(t *testing.T) {
-	s := httptest.NewServer(&watcherConn{})
-
-	conn, _, _ := websocket.DefaultDialer.Dial(
-		strings.Replace(s.URL, "http://", "ws://", 1), nil)
-
-	wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+	ts := &watcherConn{}
+	ts.Test()
+	return ts
+}
 
-	w := event.NewInstanceSubscriber("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
-		results = append(results, &registry.WatchInstanceResponse{
-			Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
-			Action:   string(registry.EVT_CREATE),
-			Key:      &registry.MicroServiceKey{},
-			Instance: &registry.MicroServiceInstance{},
-		})
-		return
+func TestNewWebSocket(t *testing.T) {
+	mock := NewTest()
+	t.Run("should return not nil when new", func(t *testing.T) {
+		assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
 	})
+}
 
-	ws := wss.New(context.Background(), conn, w)
-	err := ws.Init()
-	if err != nil {
-		t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+	mock := NewTest()
+	conn := mock.ServerConn
+	options := wss.ToOptions()
+	webSocket := &wss.WebSocket{
+		Options:       options,
+		DomainProject: "default",
+		ConsumerID:    "",
+		Conn:          conn,
 	}
 
-	event.Center().Start()
-
-	go func() {
-		wss.ListAndWatch(context.Background(), "", nil, conn)
-
-		w2 := event.NewInstanceSubscriber("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
-			return
-		})
-		ws2 := wss.New(context.Background(), conn, w2)
-		err := ws2.Init()
-		if err != nil {
-			t.Fatalf("TestPublisher_Run")
-		}
-	}()
-
-	go ws.HandleControlMessage()
-
-	w.OnMessage(nil)
-	w.OnMessage(&event.InstanceEvent{})
-
-	event.Center().Fire(event.NewInstanceEvent("g", "s", 1, &registry.WatchInstanceResponse{
-		Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
-		Action:   string(registry.EVT_CREATE),
-		Key:      &registry.MicroServiceKey{},
-		Instance: &registry.MicroServiceInstance{},
-	}))
-
-	<-time.After(time.Second)
-
-	ws.HandleEvent(nil)
-
-	ws.WritePingPong(websocket.PingMessage)
-	ws.WritePingPong(websocket.PongMessage)
-
-	ws.HandleEvent(time.Now())
+	t.Run("should not check when new", func(t *testing.T) {
+		webSocket.HealthInterval = time.Second
+		webSocket.Init()
+		assert.Nil(t, webSocket.NeedCheck())
+	})
 
-	closeCh <- struct{}{}
+	t.Run("should check when check time up", func(t *testing.T) {
+		webSocket.HealthInterval = time.Microsecond
+		webSocket.Init()
+		<-time.After(time.Microsecond)
+		assert.NotNil(t, webSocket.NeedCheck())
+	})
+	t.Run("should not check when busy", func(t *testing.T) {
+		webSocket.HealthInterval = time.Microsecond
+		webSocket.Init()
+		<-time.After(time.Microsecond)
+		assert.NotNil(t, webSocket.NeedCheck())
+		assert.Nil(t, webSocket.NeedCheck())
+	})
+}
 
-	<-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+	mock := NewTest()
+	webSocket := wss.NewWebSocket("", "", mock.ServerConn)
 
-	ws.WritePingPong(websocket.PingMessage)
-	ws.WritePingPong(websocket.PongMessage)
+	t.Run("should idle when new", func(t *testing.T) {
+		select {
+		case <-webSocket.Idle():
+		default:
+			assert.Fail(t, "not idle")
+		}
+	})
+	t.Run("should idle when setIdle", func(t *testing.T) {
+		select {
+		case <-webSocket.Idle():
+			assert.Fail(t, "idle")
+		default:
+			webSocket.SetIdle()
+			select {
+			case <-webSocket.Idle():
+			default:
+				assert.Fail(t, "not idle")
+			}
+		}
+	})
+	t.Run("should idle when checkHealth", func(t *testing.T) {
+		_ = webSocket.CheckHealth(context.Background())
+		select {
+		case <-webSocket.Idle():
+		default:
+			assert.Fail(t, "not idle")
+		}
+	})
+}
 
-	w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+	mock := NewTest()
+	event.Center().Start()
 
-	wss.Runner().Stop()
+	t.Run("should do nothing when recv PING", func(t *testing.T) {
+		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+		<-time.After(time.Second)
+		assert.Nil(t, ws.CheckHealth(context.Background()))
+	})
+	t.Run("should return err when consumer not exist", func(t *testing.T) {
+		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+	})
 }
diff --git a/server/core/proto/services.go b/server/core/proto/services.go
index 12f6af4..37d82a1 100644
--- a/server/core/proto/services.go
+++ b/server/core/proto/services.go
@@ -28,7 +28,6 @@ type ServiceInstanceCtrlServerEx interface {
 	BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) (*registry.BatchFindInstancesResponse, error)
 
 	WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
-	WebSocketListAndWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
 
 	ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, error)
 }
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index c5fad5a..cb1895f 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,11 +18,8 @@
 package event
 
 import (
-	"context"
 	"github.com/apache/servicecomb-service-center/pkg/event"
-	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"time"
 )
 
@@ -30,10 +27,7 @@ const AddJobTimeout = 1 * time.Second
 
 type InstanceSubscriber struct {
 	event.Subscriber
-	Job          chan *InstanceEvent
-	ListRevision int64
-	ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
-	listCh       chan struct{}
+	Job chan *InstanceEvent
 }
 
 func (w *InstanceSubscriber) SetError(err error) {
@@ -50,19 +44,6 @@ func (w *InstanceSubscriber) OnAccept() {
 		return
 	}
 	log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
-	gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceSubscriber) listAndPublishJobs(_ context.Context) {
-	defer close(w.listCh)
-	if w.ListFunc == nil {
-		return
-	}
-	results, rev := w.ListFunc()
-	w.ListRevision = rev
-	for _, response := range results {
-		w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), w.ListRevision, response))
-	}
 }
 
 //被通知
@@ -75,26 +56,6 @@ func (w *InstanceSubscriber) OnMessage(job event.Event) {
 	if !ok {
 		return
 	}
-
-	select {
-	case <-w.listCh:
-	default:
-		timer := time.NewTimer(w.Timeout())
-		select {
-		case <-w.listCh:
-			timer.Stop()
-		case <-timer.C:
-			log.Errorf(nil,
-				"the %s listwatcher %s %s is not ready[over %s], send the event %v",
-				w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
-		}
-	}
-
-	if wJob.Revision <= w.ListRevision {
-		log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
-			w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
-		return
-	}
 	w.sendMessage(wJob)
 }
 
@@ -123,13 +84,10 @@ func (w *InstanceSubscriber) Close() {
 	close(w.Job)
 }
 
-func NewInstanceSubscriber(serviceID, domainProject string,
-	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceSubscriber {
+func NewInstanceSubscriber(serviceID, domainProject string) *InstanceSubscriber {
 	watcher := &InstanceSubscriber{
 		Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
 		Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
-		ListFunc:   listFunc,
-		listCh:     make(chan struct{}),
 	}
 	return watcher
 }
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
 func (this *WatchService) URLPatterns() []rest.Route {
 	return []rest.Route{
 		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
-		{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
 	}
 }
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index 9c1b1e9..347603f 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -33,7 +33,6 @@ type WatchService struct {
 func (s *WatchService) URLPatterns() []rest.Route {
 	return []rest.Route{
 		{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
-		{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/listwatcher", Func: s.ListAndWatch},
 	}
 }
 
@@ -62,16 +61,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
 		SelfServiceId: r.URL.Query().Get(":serviceId"),
 	}, conn)
 }
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
-	conn, err := upgrade(w, r)
-	if err != nil {
-		return
-	}
-	defer conn.Close()
-
-	r.Method = "WATCHLIST"
-	core.InstanceAPI.WebSocketListAndWatch(r.Context(), &pb.WatchInstanceRequest{
-		SelfServiceId: r.URL.Query().Get(":serviceId"),
-	}, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index 2163f03..b12cf82 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -48,7 +48,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream proto.Servic
 		return err
 	}
 
-	return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
+	return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -57,16 +57,5 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
 		ws.SendEstablishError(conn, err)
 		return
 	}
-	ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
-	log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId)
-	if err := s.WatchPreOpera(ctx, in); err != nil {
-		ws.SendEstablishError(conn, err)
-		return
-	}
-	ws.ListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
-		return serviceUtil.QueryAllProvidersInstances(ctx, in.SelfServiceId)
-	}, conn)
+	ws.Watch(ctx, in.SelfServiceId, conn)
 }
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index fe2dfd1..64e3622 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
 	instanceResource.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
 }
 
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
-	defer func() {
-		recover()
-	}()
-	instanceResource.WebSocketListAndWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
-}
-
 var _ = Describe("'Instance' service", func() {
 	Describe("execute 'watch' operartion", func() {
 		var (