You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/17 12:00:20 UTC
[servicecomb-service-center] branch v1.x updated: SCB-2176 Refactor
websocket (#981)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/v1.x by this push:
new db013af SCB-2176 Refactor websocket (#981)
db013af is described below
commit db013afc9c9d1673999190b9ceb967a08e673813
Author: little-cui <su...@qq.com>
AuthorDate: Mon May 17 20:00:11 2021 +0800
SCB-2176 Refactor websocket (#981)
* SCB-2176 Refactor websocket
* SCB-2176 Add UTs
---
integration/apis.go | 1 -
integration/instances_test.go | 12 -
server/connection/grpc/stream.go | 5 +-
server/connection/grpc/stream_test.go | 4 +-
server/connection/ws/broker.go | 81 +++++++
.../connection/ws/{options.go => broker_test.go} | 30 ++-
server/connection/ws/common.go | 33 ++-
.../services.go => connection/ws/common_test.go} | 35 ++-
.../ws/{keepalive.go => health_check.go} | 90 ++++----
.../ws/{options.go => health_check_test.go} | 29 ++-
server/connection/ws/options.go | 10 +-
server/connection/ws/websocket.go | 244 +++++++--------------
server/connection/ws/websocket_test.go | 187 +++++++++-------
server/core/proto/services.go | 1 -
server/event/instance_subscriber.go | 46 +---
server/rest/controller/v3/instance_watcher.go | 1 -
server/rest/controller/v4/instance_watcher.go | 14 --
server/service/watch.go | 15 +-
server/service/watch_test.go | 7 -
19 files changed, 412 insertions(+), 433 deletions(-)
diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = "/v4/default/registry/microservices/:serviceId/inst
var UPDATEINSTANCESTATUS = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
var INSTANCEHEARTBEAT = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = "/v4/default/registry/microservices/:serviceId/listwatcher"
//Governance API's
var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 0f90d31..b84fbc4 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -599,18 +599,6 @@ var _ = Describe("MicroService Api Test", func() {
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
})
- It("Call the listwatcher API ", func() {
- //This api gives 400 bad request for the integration test
- // as integration test is not able to make ws connection
- url := strings.Replace(INSTANCELISTWATCHER, ":serviceId", serviceId, 1)
- req, _ := http.NewRequest(GET, SCURL+url, nil)
- req.Header.Set("X-Domain-Name", "default")
- resp, err := scclient.Do(req)
- Expect(err).To(BeNil())
- defer resp.Body.Close()
-
- Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
- })
})
})
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index 0609bdb..0e5415e 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -21,7 +21,6 @@ import (
"context"
"errors"
"github.com/apache/servicecomb-service-center/pkg/log"
- pb "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
"github.com/apache/servicecomb-service-center/server/core/proto"
@@ -67,10 +66,10 @@ func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrl_
}
}
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrl_WatchServer) (err error) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- watcher := event.NewInstanceSubscriber(serviceID, domainProject, f)
+ watcher := event.NewInstanceSubscriber(serviceID, domainProject)
err = event.Center().AddSubscriber(watcher)
if err != nil {
return
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 6ee8eb3..c549b65 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -41,7 +41,7 @@ func (x *grpcWatchServer) Context() context.Context {
}
func TestHandleWatchJob(t *testing.T) {
- w := event.NewInstanceSubscriber("g", "s", nil)
+ w := event.NewInstanceSubscriber("g", "s")
w.Job <- nil
err := stream.Handle(w, &grpcWatchServer{})
if err == nil {
@@ -54,6 +54,6 @@ func TestHandleWatchJob(t *testing.T) {
func TestDoStreamListAndWatch(t *testing.T) {
defer log.Recover()
- err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+ err := stream.Watch(context.Background(), "s", nil)
t.Fatal("TestDoStreamListAndWatch failed", err)
}
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..e4901ff
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/event"
+)
+
+type Broker struct {
+ consumer *WebSocket
+ producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case instanceEvent, ok := <-b.producer.Job:
+ if !ok {
+ return fmt.Errorf("read producer[%v] event failed", b.producer.Group())
+ }
+ err := b.write(instanceEvent)
+ if err != nil {
+ log.Errorf(err, "write instance event to subscriber[%s] failed, group: %s",
+ b.consumer.RemoteAddr, b.producer.Group())
+ return err
+ }
+ }
+ }
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+ resp := evt.Response
+ providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
+ if resp.Action != string(pb.EVT_EXPIRE) {
+ providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+ }
+ remoteAddr := b.consumer.Conn.RemoteAddr().String()
+ log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+ resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+ resp.Response = nil
+ data, err := json.Marshal(resp)
+ if err != nil {
+ log.Errorf(err, "subscriber[%s] watch %s, group: %s",
+ remoteAddr, providerFlag, b.producer.Group())
+ data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
+ }
+ err = b.consumer.WriteTextMessage(data)
+ connection.ReportPublishCompleted(evt, err)
+ return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+ return &Broker{
+ consumer: ws,
+ producer: is,
+ }
+}
diff --git a/server/connection/ws/options.go b/server/connection/ws/broker_test.go
similarity index 59%
copy from server/connection/ws/options.go
copy to server/connection/ws/broker_test.go
index 5817ef0..a3597c3 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package ws
+package ws_test
import (
- "time"
-
- "github.com/apache/servicecomb-service-center/server/connection"
+ "context"
+ "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/stretchr/testify/assert"
+ "testing"
)
-type Options struct {
- ReadTimeout time.Duration
- SendTimeout time.Duration
+func TestNewBroker(t *testing.T) {
+ t.Run("should not return nil when new broker", func(t *testing.T) {
+ assert.NotNil(t, ws.NewBroker(nil, nil))
+
+ })
}
-func ToOptions() Options {
- return Options{
- ReadTimeout: connection.ReadTimeout,
- SendTimeout: connection.SendTimeout,
- }
+func TestBroker_Listen(t *testing.T) {
+ t.Run("should return err when listen context cancelled", func(t *testing.T) {
+ broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ assert.Equal(t, context.Canceled, broker.Listen(ctx))
+ })
}
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
index 15a060d..0132202 100644
--- a/server/connection/ws/common.go
+++ b/server/connection/ws/common.go
@@ -19,32 +19,43 @@ package ws
import (
"context"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
"github.com/apache/servicecomb-service-center/server/event"
"github.com/gorilla/websocket"
)
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*registry.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- socket := New(ctx, conn, event.NewInstanceSubscriber(serviceID, domainProject, f))
- connection.ReportSubscriber(domain, Websocket, 1)
- process(socket)
- connection.ReportSubscriber(domain, Websocket, -1)
-}
+ ws := NewWebSocket(domainProject, serviceID, conn)
+ HealthChecker().Accept(ws)
-func process(socket *WebSocket) {
- if err := socket.Init(); err != nil {
+ subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+ err := event.Center().AddSubscriber(subscriber)
+ if err != nil {
+ SendEstablishError(conn, err)
return
}
- socket.HandleControlMessage()
+ connection.ReportSubscriber(domain, Websocket, 1)
+ defer connection.ReportSubscriber(domain, Websocket, -1)
- socket.Stop()
+ pool := gopool.New(ctx).Do(func(ctx context.Context) {
+ if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+ log.Error(fmt.Sprintf("[%s] listen service[%s] failed", conn.RemoteAddr(), serviceID), err)
+ }
+ })
+ defer pool.Done()
+
+ if err := ws.ReadMessage(); err != nil {
+ log.Error(fmt.Sprintf("[%s] handle service[%s] control message failed", conn.RemoteAddr(), serviceID), err)
+ subscriber.SetError(err)
+ }
}
func SendEstablishError(conn *websocket.Conn, err error) {
diff --git a/server/core/proto/services.go b/server/connection/ws/common_test.go
similarity index 53%
copy from server/core/proto/services.go
copy to server/connection/ws/common_test.go
index 12f6af4..df8ae55 100644
--- a/server/core/proto/services.go
+++ b/server/connection/ws/common_test.go
@@ -14,21 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package proto
+
+package ws_test
import (
"context"
- "github.com/apache/servicecomb-service-center/pkg/registry"
- "github.com/gorilla/websocket"
-)
-
-type ServiceInstanceCtrlServerEx interface {
- ServiceInstanceCtrlServer
+ "errors"
+ "testing"
- BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) (*registry.BatchFindInstancesResponse, error)
+ wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/stretchr/testify/assert"
+)
- WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
- WebSocketListAndWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+ mock := NewTest()
+ t.Run("should read the err when call", func(t *testing.T) {
+ wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+ _, message, err := mock.ClientConn.ReadMessage()
+ assert.Nil(t, err)
+ assert.Equal(t, "error", string(message))
+ })
+}
- ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, error)
+func TestWatch(t *testing.T) {
+ t.Run("should return when ctx cancelled", func(t *testing.T) {
+ mock := NewTest()
+ mock.ServerConn.Close()
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ wss.Watch(ctx, "", mock.ServerConn)
+ })
}
diff --git a/server/connection/ws/keepalive.go b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/keepalive.go
rename to server/connection/ws/health_check.go
index 3bd7e8b..aeb4f38 100644
--- a/server/connection/ws/keepalive.go
+++ b/server/connection/ws/health_check.go
@@ -19,39 +19,36 @@ package ws
import (
"context"
- "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "fmt"
"sync"
"time"
+
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
)
-var runner *KeepaliveRunner
+var checker *HealthCheck
func init() {
- runner = NewRunner()
- runner.Run()
+ checker = NewHealthCheck()
+ checker.Run()
}
-type KeepaliveRunner struct {
+type HealthCheck struct {
wss []*WebSocket
lock sync.Mutex
goroutine *gopool.Pool
}
-func (wh *KeepaliveRunner) Run() {
- gopool.Go(runner.loop)
+func (wh *HealthCheck) Run() {
+ gopool.Go(checker.loop)
}
-func (wh *KeepaliveRunner) Stop() {
+func (wh *HealthCheck) Stop() {
wh.goroutine.Close(true)
}
-func (wh *KeepaliveRunner) dispatch(ws *WebSocket, payload interface{}) {
- wh.goroutine.Do(func(ctx context.Context) {
- ws.HandleEvent(payload)
- })
-}
-
-func (wh *KeepaliveRunner) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
defer wh.Stop()
ticker := time.NewTicker(500 * time.Millisecond)
for {
@@ -60,49 +57,52 @@ func (wh *KeepaliveRunner) loop(ctx context.Context) {
// server shutdown
return
case <-ticker.C:
- var removes []int
- for i, ws := range wh.wss {
- if payload := ws.Pick(); payload != nil {
- if _, ok := payload.(error); ok {
- removes = append(removes, i)
- }
- wh.dispatch(ws, payload)
+ for _, ws := range wh.wss {
+ if t := ws.NeedCheck(); t == nil {
+ continue
}
+ wh.check(ws)
}
- if len(removes) == 0 {
- continue
- }
-
- wh.lock.Lock()
- var (
- news []*WebSocket
- s int
- )
- for _, e := range removes {
- news = append(news, wh.wss[s:e]...)
- s = e + 1
- }
- if s < len(wh.wss) {
- news = append(news, wh.wss[s:]...)
- }
- wh.wss = news
- wh.lock.Unlock()
}
}
}
-func (wh *KeepaliveRunner) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+ wh.goroutine.Do(func(ctx context.Context) {
+ if err := ws.CheckHealth(ctx); err != nil {
+ wh.Remove(ws)
+ log.Error(fmt.Sprintf("checker removed unhealth websocket[%s]", ws.RemoteAddr), err)
+ }
+ })
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
wh.lock.Lock()
wh.wss = append(wh.wss, ws)
+ n := len(wh.wss)
+ wh.lock.Unlock()
+ return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+ wh.lock.Lock()
+ for i, t := range wh.wss {
+ if t == ws {
+ wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+ break
+ }
+ }
+ n := len(wh.wss)
wh.lock.Unlock()
+ return n
}
-func NewRunner() *KeepaliveRunner {
- return &KeepaliveRunner{
+func NewHealthCheck() *HealthCheck {
+ return &HealthCheck{
goroutine: gopool.New(context.Background()),
}
}
-func Runner() *KeepaliveRunner {
- return runner
+func HealthChecker() *HealthCheck {
+ return checker
}
diff --git a/server/connection/ws/options.go b/server/connection/ws/health_check_test.go
similarity index 62%
copy from server/connection/ws/options.go
copy to server/connection/ws/health_check_test.go
index 5817ef0..7ab0578 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/health_check_test.go
@@ -15,22 +15,27 @@
* limitations under the License.
*/
-package ws
+package ws_test
import (
- "time"
-
- "github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/stretchr/testify/assert"
+ "testing"
)
-type Options struct {
- ReadTimeout time.Duration
- SendTimeout time.Duration
+func TestNewHealthCheck(t *testing.T) {
+ t.Run("should not return nil when new", func(t *testing.T) {
+ assert.NotNil(t, ws.NewHealthCheck())
+ })
}
-func ToOptions() Options {
- return Options{
- ReadTimeout: connection.ReadTimeout,
- SendTimeout: connection.SendTimeout,
- }
+func TestHealthCheck_Run(t *testing.T) {
+ mock := NewTest()
+
+ t.Run("should return 1 when accept one ws", func(t *testing.T) {
+ check := ws.NewHealthCheck()
+ webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+ assert.Equal(t, 1, check.Accept(webSocket))
+ assert.Equal(t, 0, check.Remove(webSocket))
+ })
}
diff --git a/server/connection/ws/options.go b/server/connection/ws/options.go
index 5817ef0..3196623 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/options.go
@@ -24,13 +24,15 @@ import (
)
type Options struct {
- ReadTimeout time.Duration
- SendTimeout time.Duration
+ ReadTimeout time.Duration
+ SendTimeout time.Duration
+ HealthInterval time.Duration
}
func ToOptions() Options {
return Options{
- ReadTimeout: connection.ReadTimeout,
- SendTimeout: connection.SendTimeout,
+ ReadTimeout: connection.ReadTimeout,
+ SendTimeout: connection.SendTimeout,
+ HealthInterval: connection.HeartbeatInterval,
}
}
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index fc6db9c..24a7e2b 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,13 +19,9 @@ package ws
import (
"context"
- "encoding/json"
"fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
- pb "github.com/apache/servicecomb-service-center/pkg/registry"
- "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
- "github.com/apache/servicecomb-service-center/server/event"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"github.com/gorilla/websocket"
"time"
@@ -35,258 +31,182 @@ const Websocket = "Websocket"
type WebSocket struct {
Options
-
- ctx context.Context
- ticker *time.Ticker
- conn *websocket.Conn
- // watcher subscribe the notification service event
- watcher *event.InstanceSubscriber
- needPingWatcher bool
- free chan struct{}
- closed chan struct{}
+ Conn *websocket.Conn
+ RemoteAddr string
+ DomainProject string
+ ConsumerID string
+
+ ticker *time.Ticker
+ needPing bool
+ idleCh chan struct{}
}
-func (wh *WebSocket) Init() error {
- wh.ticker = time.NewTicker(connection.HeartbeatInterval)
- wh.needPingWatcher = true
- wh.free = make(chan struct{}, 1)
- wh.closed = make(chan struct{})
-
- wh.SetReady()
+func (wh *WebSocket) Init() {
+ wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+ wh.ticker = time.NewTicker(wh.HealthInterval)
+ wh.needPing = true
+ wh.idleCh = make(chan struct{}, 1)
- remoteAddr := wh.conn.RemoteAddr().String()
+ wh.registerMessageHandler()
- // put in notification service queue
- if err := event.Center().AddSubscriber(wh.watcher); err != nil {
- err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
- remoteAddr, err.Error())
- log.Errorf(nil, err.Error())
+ wh.SetIdle()
- werr := wh.conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error()))
- if werr != nil {
- log.Errorf(werr, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
- }
- return err
- }
-
- // put in runner queue
- Runner().Accept(wh)
-
- log.Debugf("start watching instance status, watcher[%s], subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return nil
+ log.Debugf("start watching instance status, subscriber[%s], consumer: %s",
+ wh.RemoteAddr, wh.ConsumerID)
}
-func (wh *WebSocket) HandleControlMessage() {
- remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+ remoteAddr := wh.RemoteAddr
// PING
- wh.conn.SetPingHandler(func(message string) error {
+ wh.Conn.SetPingHandler(func(message string) error {
defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
}()
- if wh.needPingWatcher {
- log.Infof("received 'Ping' message '%s' from watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
- message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ if wh.needPing {
+ log.Infof("received 'Ping' message '%s' from subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+ message, remoteAddr, wh.ConsumerID)
}
- wh.needPingWatcher = false
+ wh.needPing = false
return wh.WritePingPong(websocket.PongMessage)
})
// PONG
- wh.conn.SetPongHandler(func(message string) error {
+ wh.Conn.SetPongHandler(func(message string) error {
defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
}()
- log.Debugf("received 'Pong' message '%s' from watcher[%s], subject: %s, group: %s",
- message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ log.Debugf("received 'Pong' message '%s' from subscriber[%s], consumer: %s",
+ message, remoteAddr, wh.ConsumerID)
return nil
})
// CLOSE
- wh.conn.SetCloseHandler(func(code int, text string) error {
- log.Infof("watcher[%s] active closed, code: %d, message: '%s', subject: %s, group: %s",
- remoteAddr, code, text, wh.watcher.Subject(), wh.watcher.Group())
+ wh.Conn.SetCloseHandler(func(code int, text string) error {
+ log.Infof("subscriber[%s] active closed, code: %d, message: '%s', consumer: %s",
+ remoteAddr, code, text, wh.ConsumerID)
return wh.sendClose(code, text)
})
+}
- wh.conn.SetReadLimit(connection.ReadMaxBody)
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+func (wh *WebSocket) ReadMessage() error {
+ wh.Conn.SetReadLimit(connection.ReadMaxBody)
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
for {
- _, _, err := wh.conn.ReadMessage()
+ _, _, err := wh.Conn.ReadMessage()
if err != nil {
- // client close or conn broken
- wh.watcher.SetError(err)
- return
+ return err
}
}
}
func (wh *WebSocket) sendClose(code int, text string) error {
- remoteAddr := wh.conn.RemoteAddr().String()
+ remoteAddr := wh.Conn.RemoteAddr().String()
var message []byte
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
- err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
+ err := wh.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
if err != nil {
- log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+ remoteAddr, wh.ConsumerID)
return err
}
return nil
}
-// Pick will be called by runner
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
select {
- case <-wh.Ready():
- if wh.watcher.Err() != nil {
- return wh.watcher.Err()
- }
-
+ case <-wh.Idle():
select {
case t := <-wh.ticker.C:
return t
- case j := <-wh.watcher.Job:
- if j == nil {
- return fmt.Errorf("server shutdown")
- }
- return j
default:
- // reset if idle
- wh.SetReady()
+ // reset if idleCh
+ wh.SetIdle()
}
default:
}
-
return nil
}
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
- defer wh.SetReady()
-
- var remoteAddr = wh.conn.RemoteAddr().String()
- switch o := o.(type) {
- case error:
- log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- _ = wh.write(util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error())))
- case time.Time:
- wh.Keepalive()
- case *event.InstanceEvent:
- wh.WriteInstanceEvent(o)
- default:
- log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, group: %s",
- remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
- }
-}
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+ defer wh.SetIdle()
-func (wh *WebSocket) Keepalive() {
- domainProject := util.ParseDomainProject(wh.ctx)
- if !serviceUtil.ServiceExist(wh.ctx, domainProject, wh.watcher.Group()) {
- _ = wh.write(util.StringToBytesWithNoCopy("Service does not exit."))
- return
+ if !wh.needPing {
+ return nil
}
- if !wh.needPingWatcher {
- return
+ if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
+ return fmt.Errorf("Service does not exist.")
}
- remoteAddr := wh.conn.RemoteAddr().String()
+ remoteAddr := wh.Conn.RemoteAddr().String()
if err := wh.WritePingPong(websocket.PingMessage); err != nil {
- log.Errorf(err, "send 'Ping' message to watcher[%s] failed, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return
+ log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, consumer: %s",
+ remoteAddr, wh.ConsumerID)
+ return err
}
- log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return
+ log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+ remoteAddr, wh.ConsumerID)
+ return nil
}
func (wh *WebSocket) WritePingPong(messageType int) error {
- err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
+ err := wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
if err != nil {
messageTypeName := "Ping"
if messageType == websocket.PongMessage {
messageTypeName = "Pong"
}
- log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, group: %s",
- messageTypeName, wh.conn.RemoteAddr(), wh.watcher.Subject(), wh.watcher.Group())
- //wh.watcher.SetError(err)
+ log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: %s",
+ messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
+ //wh.subscriber.SetError(err)
return err
}
return nil
}
-func (wh *WebSocket) WriteInstanceEvent(evt *event.InstanceEvent) {
- resp := evt.Response
- providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
- if resp.Action != string(pb.EVT_EXPIRE) {
- providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
- }
- remoteAddr := wh.conn.RemoteAddr().String()
- log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, group: %s",
- resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), wh.watcher.Group())
-
- resp.Response = nil
- data, err := json.Marshal(resp)
- if err != nil {
- log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
- remoteAddr, providerFlag, evt, wh.watcher.Subject(), wh.watcher.Group())
- data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
- }
- err = wh.write(data)
- connection.ReportPublishCompleted(evt, err)
-}
-
-func (wh *WebSocket) write(message []byte) error {
- select {
- case <-wh.closed:
- return nil
- default:
- }
-
- err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+ err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
if err != nil {
return err
}
- err = wh.conn.WriteMessage(websocket.TextMessage, message)
+ err = wh.Conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
- log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
- wh.conn.RemoteAddr().String(), wh.watcher.Subject(), wh.watcher.Group())
+ log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+ wh.Conn.RemoteAddr().String(), len(message))
}
return err
}
-func (wh *WebSocket) Ready() <-chan struct{} {
- return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+ return wh.idleCh
}
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
select {
- case wh.free <- struct{}{}:
+ case wh.idleCh <- struct{}{}:
default:
}
}
-func (wh *WebSocket) Stop() {
- close(wh.closed)
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceSubscriber) *WebSocket {
- return &WebSocket{
- Options: ToOptions(),
- ctx: ctx,
- conn: conn,
- watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) *WebSocket {
+ ws := &WebSocket{
+ Options: ToOptions(),
+ DomainProject: domainProject,
+ ConsumerID: serviceID,
+ Conn: conn,
}
+ ws.Init()
+ return ws
}
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 8338b8f..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,118 +16,149 @@
*/
package ws_test
-// initialize
-import _ "github.com/apache/servicecomb-service-center/test"
import (
+ _ "github.com/apache/servicecomb-service-center/test"
+
"context"
- "errors"
- "github.com/apache/servicecomb-service-center/pkg/registry"
- wss "github.com/apache/servicecomb-service-center/server/connection/ws"
- "github.com/apache/servicecomb-service-center/server/core"
- "github.com/apache/servicecomb-service-center/server/core/proto"
- "github.com/apache/servicecomb-service-center/server/event"
- "github.com/gorilla/websocket"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
+
+ wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
)
var closeCh = make(chan struct{})
-type watcherConn struct {
-}
-
func init() {
testing.Init()
core.Initialize()
}
+
+type watcherConn struct {
+ ClientConn *websocket.Conn
+ ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+ s := httptest.NewServer(h)
+ h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+ strings.Replace(s.URL, "http://", "ws://", 1), nil)
+}
+
func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{}
- conn, _ := upgrader.Upgrade(w, r, nil)
+ h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
for {
- conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
- conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
- _, _, err := conn.ReadMessage()
+ //h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+ //h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
+ _, _, err := h.ServerConn.ReadMessage()
if err != nil {
return
}
<-closeCh
- conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
- conn.Close()
+ h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+ h.ServerConn.Close()
return
}
}
-func TestDoWebSocketListAndWatch(t *testing.T) {
- s := httptest.NewServer(&watcherConn{})
-
- conn, _, _ := websocket.DefaultDialer.Dial(
- strings.Replace(s.URL, "http://", "ws://", 1), nil)
-
- wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+ ts := &watcherConn{}
+ ts.Test()
+ return ts
+}
- w := event.NewInstanceSubscriber("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
- results = append(results, ®istry.WatchInstanceResponse{
- Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
- Action: string(registry.EVT_CREATE),
- Key: ®istry.MicroServiceKey{},
- Instance: ®istry.MicroServiceInstance{},
- })
- return
+func TestNewWebSocket(t *testing.T) {
+ mock := NewTest()
+ t.Run("should return not nil when new", func(t *testing.T) {
+ assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
})
+}
- ws := wss.New(context.Background(), conn, w)
- err := ws.Init()
- if err != nil {
- t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+ mock := NewTest()
+ conn := mock.ServerConn
+ options := wss.ToOptions()
+ webSocket := &wss.WebSocket{
+ Options: options,
+ DomainProject: "default",
+ ConsumerID: "",
+ Conn: conn,
}
- event.Center().Start()
-
- go func() {
- wss.ListAndWatch(context.Background(), "", nil, conn)
-
- w2 := event.NewInstanceSubscriber("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
- return
- })
- ws2 := wss.New(context.Background(), conn, w2)
- err := ws2.Init()
- if err != nil {
- t.Fatalf("TestPublisher_Run")
- }
- }()
-
- go ws.HandleControlMessage()
-
- w.OnMessage(nil)
- w.OnMessage(&event.InstanceEvent{})
-
- event.Center().Fire(event.NewInstanceEvent("g", "s", 1, ®istry.WatchInstanceResponse{
- Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
- Action: string(registry.EVT_CREATE),
- Key: ®istry.MicroServiceKey{},
- Instance: ®istry.MicroServiceInstance{},
- }))
-
- <-time.After(time.Second)
-
- ws.HandleEvent(nil)
-
- ws.WritePingPong(websocket.PingMessage)
- ws.WritePingPong(websocket.PongMessage)
-
- ws.HandleEvent(time.Now())
+ t.Run("should not check when new", func(t *testing.T) {
+ webSocket.HealthInterval = time.Second
+ webSocket.Init()
+ assert.Nil(t, webSocket.NeedCheck())
+ })
- closeCh <- struct{}{}
+ t.Run("should check when check time up", func(t *testing.T) {
+ webSocket.HealthInterval = time.Microsecond
+ webSocket.Init()
+ <-time.After(time.Microsecond)
+ assert.NotNil(t, webSocket.NeedCheck())
+ })
+ t.Run("should not check when busy", func(t *testing.T) {
+ webSocket.HealthInterval = time.Microsecond
+ webSocket.Init()
+ <-time.After(time.Microsecond)
+ assert.NotNil(t, webSocket.NeedCheck())
+ assert.Nil(t, webSocket.NeedCheck())
+ })
+}
- <-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+ mock := NewTest()
+ webSocket := wss.NewWebSocket("", "", mock.ServerConn)
- ws.WritePingPong(websocket.PingMessage)
- ws.WritePingPong(websocket.PongMessage)
+ t.Run("should idle when new", func(t *testing.T) {
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ })
+ t.Run("should idle when setIdle", func(t *testing.T) {
+ select {
+ case <-webSocket.Idle():
+ assert.Fail(t, "idle")
+ default:
+ webSocket.SetIdle()
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ }
+ })
+ t.Run("should idle when checkHealth", func(t *testing.T) {
+ _ = webSocket.CheckHealth(context.Background())
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ })
+}
- w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+ mock := NewTest()
+ event.Center().Start()
- wss.Runner().Stop()
+ t.Run("should do nothing when recv PING", func(t *testing.T) {
+ ws := wss.NewWebSocket("", "", mock.ServerConn)
+ mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+ <-time.After(time.Second)
+ assert.Nil(t, ws.CheckHealth(context.Background()))
+ })
+ t.Run("should return err when consumer not exist", func(t *testing.T) {
+ ws := wss.NewWebSocket("", "", mock.ServerConn)
+ assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+ })
}
diff --git a/server/core/proto/services.go b/server/core/proto/services.go
index 12f6af4..37d82a1 100644
--- a/server/core/proto/services.go
+++ b/server/core/proto/services.go
@@ -28,7 +28,6 @@ type ServiceInstanceCtrlServerEx interface {
BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) (*registry.BatchFindInstancesResponse, error)
WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
- WebSocketListAndWatch(ctx context.Context, in *registry.WatchInstanceRequest, conn *websocket.Conn)
ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, error)
}
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index c5fad5a..cb1895f 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,11 +18,8 @@
package event
import (
- "context"
"github.com/apache/servicecomb-service-center/pkg/event"
- "github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- pb "github.com/apache/servicecomb-service-center/pkg/registry"
"time"
)
@@ -30,10 +27,7 @@ const AddJobTimeout = 1 * time.Second
type InstanceSubscriber struct {
event.Subscriber
- Job chan *InstanceEvent
- ListRevision int64
- ListFunc func() (results []*pb.WatchInstanceResponse, rev int64)
- listCh chan struct{}
+ Job chan *InstanceEvent
}
func (w *InstanceSubscriber) SetError(err error) {
@@ -50,19 +44,6 @@ func (w *InstanceSubscriber) OnAccept() {
return
}
log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
- gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceSubscriber) listAndPublishJobs(_ context.Context) {
- defer close(w.listCh)
- if w.ListFunc == nil {
- return
- }
- results, rev := w.ListFunc()
- w.ListRevision = rev
- for _, response := range results {
- w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), w.ListRevision, response))
- }
}
//被通知
@@ -75,26 +56,6 @@ func (w *InstanceSubscriber) OnMessage(job event.Event) {
if !ok {
return
}
-
- select {
- case <-w.listCh:
- default:
- timer := time.NewTimer(w.Timeout())
- select {
- case <-w.listCh:
- timer.Stop()
- case <-timer.C:
- log.Errorf(nil,
- "the %s listwatcher %s %s is not ready[over %s], send the event %v",
- w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
- }
- }
-
- if wJob.Revision <= w.ListRevision {
- log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
- w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
- return
- }
w.sendMessage(wJob)
}
@@ -123,13 +84,10 @@ func (w *InstanceSubscriber) Close() {
close(w.Job)
}
-func NewInstanceSubscriber(serviceID, domainProject string,
- listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceSubscriber {
+func NewInstanceSubscriber(serviceID, domainProject string) *InstanceSubscriber {
watcher := &InstanceSubscriber{
Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
- ListFunc: listFunc,
- listCh: make(chan struct{}),
}
return watcher
}
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
func (this *WatchService) URLPatterns() []rest.Route {
return []rest.Route{
{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
}
}
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index 9c1b1e9..347603f 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -33,7 +33,6 @@ type WatchService struct {
func (s *WatchService) URLPatterns() []rest.Route {
return []rest.Route{
{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
- {Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/listwatcher", Func: s.ListAndWatch},
}
}
@@ -62,16 +61,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
SelfServiceId: r.URL.Query().Get(":serviceId"),
}, conn)
}
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
- conn, err := upgrade(w, r)
- if err != nil {
- return
- }
- defer conn.Close()
-
- r.Method = "WATCHLIST"
- core.InstanceAPI.WebSocketListAndWatch(r.Context(), &pb.WatchInstanceRequest{
- SelfServiceId: r.URL.Query().Get(":serviceId"),
- }, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index 2163f03..b12cf82 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -48,7 +48,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream proto.Servic
return err
}
- return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
+ return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
}
func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -57,16 +57,5 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
ws.SendEstablishError(conn, err)
return
}
- ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
- log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId)
- if err := s.WatchPreOpera(ctx, in); err != nil {
- ws.SendEstablishError(conn, err)
- return
- }
- ws.ListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
- return serviceUtil.QueryAllProvidersInstances(ctx, in.SelfServiceId)
- }, conn)
+ ws.Watch(ctx, in.SelfServiceId, conn)
}
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index fe2dfd1..64e3622 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
instanceResource.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
}
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
- defer func() {
- recover()
- }()
- instanceResource.WebSocketListAndWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
-}
-
var _ = Describe("'Instance' service", func() {
Describe("execute 'watch' operartion", func() {
var (