You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/08/03 00:55:18 UTC

[servicecomb-service-center] branch master updated: Fix: Clean code (#1120)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new f31c2a7  Fix: Clean code (#1120)
f31c2a7 is described below

commit f31c2a7291fac7ad26e58846785c7a41ac6c9f5e
Author: little-cui <su...@qq.com>
AuthorDate: Tue Aug 3 08:53:52 2021 +0800

    Fix: Clean code (#1120)
---
 datasource/dep.go                                  |   1 -
 datasource/etcd/dep.go                             |   4 -
 datasource/etcd/event/instance_event_handler.go    |   6 +-
 datasource/etcd/event/tag_event_handler.go         |   2 +-
 datasource/mongo/dep.go                            |   4 -
 datasource/mongo/event/instance_event_handler.go   |   6 +-
 server/api.go                                      |  76 +++++---------
 server/connection/grpc/stream.go                   |  83 ---------------
 server/connection/grpc/stream_test.go              |  60 -----------
 server/event/instance_event.go                     |  12 +--
 server/pubsub/pubsub.go                            |  58 +++++++++++
 .../disco/watch_test.go => pubsub/pubsub_test.go}  |  42 ++------
 server/{connection => pubsub}/ws/broker.go         |   0
 server/{connection => pubsub}/ws/broker_test.go    |   2 +-
 server/{connection => pubsub}/ws/common.go         |   0
 server/{connection => pubsub}/ws/common_test.go    |   6 +-
 server/{connection => pubsub/ws}/connection.go     |   2 +-
 server/{connection => pubsub}/ws/health_check.go   |   0
 .../{connection => pubsub}/ws/health_check_test.go |   2 +-
 server/{connection => pubsub}/ws/options.go        |   8 +-
 server/{connection => pubsub}/ws/websocket.go      |   3 +-
 server/{connection => pubsub}/ws/websocket_test.go |  14 +--
 server/rest/controller/v4/instance_watcher.go      |   7 +-
 server/server.go                                   |  35 +++----
 server/service/disco/watch.go                      | 113 ---------------------
 server/service/heartbeat/websocket.go              |  13 ++-
 26 files changed, 143 insertions(+), 416 deletions(-)

diff --git a/datasource/dep.go b/datasource/dep.go
index fb381c4..1be1222 100644
--- a/datasource/dep.go
+++ b/datasource/dep.go
@@ -28,6 +28,5 @@ type DependencyManager interface {
 	SearchProviderDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error)
 	SearchConsumerDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error)
 	AddOrUpdateDependencies(ctx context.Context, dependencyInfos []*pb.ConsumerDependency, override bool) (*pb.Response, error)
-	DeleteDependency()
 	DependencyHandle(ctx context.Context) error
 }
diff --git a/datasource/etcd/dep.go b/datasource/etcd/dep.go
index 9b5dc3d..88d5941 100644
--- a/datasource/etcd/dep.go
+++ b/datasource/etcd/dep.go
@@ -102,10 +102,6 @@ func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.
 	}, nil
 }
 
-func (dm *DepManager) DeleteDependency() {
-	panic("implement me")
-}
-
 func (dm *DepManager) DependencyHandle(ctx context.Context) error {
 	var dep *event.DependencyEventHandler
 	err := dep.Handle()
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index 93d1bd5..980206f 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -102,14 +102,14 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 		return
 	}
 
-	PublishInstanceEvent(evt, domainProject, pb.MicroServiceToKey(domainProject, ms), consumerIDs)
+	PublishInstanceEvent(evt, pb.MicroServiceToKey(domainProject, ms), consumerIDs)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
 	return &InstanceEventHandler{}
 }
 
-func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.MicroServiceKey, subscribers []string) {
+func PublishInstanceEvent(evt sd.KvEvent, serviceKey *pb.MicroServiceKey, subscribers []string) {
 	defer cache.FindInstances.Remove(serviceKey)
 
 	if len(subscribers) == 0 {
@@ -123,7 +123,7 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
 		Instance: evt.KV.Value.(*pb.MicroServiceInstance),
 	}
 	for _, consumerID := range subscribers {
-		evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
+		evt := event.NewInstanceEvent(consumerID, evt.Revision, evt.CreateAt, response)
 		err := event.Center().Fire(evt)
 		if err != nil {
 			log.Errorf(err, "publish event[%v] into channel failed", evt)
diff --git a/datasource/etcd/event/tag_event_handler.go b/datasource/etcd/event/tag_event_handler.go
index 17a2d31..65a402f 100644
--- a/datasource/etcd/event/tag_event_handler.go
+++ b/datasource/etcd/event/tag_event_handler.go
@@ -91,7 +91,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
 		}
 
 		providerKey := pb.MicroServiceToKey(domainProject, provider)
-		PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, []string{consumerID})
+		PublishInstanceEvent(apt.KvEvent, providerKey, []string{consumerID})
 	}
 	return nil
 }
diff --git a/datasource/mongo/dep.go b/datasource/mongo/dep.go
index 5e17ddb..7216350 100644
--- a/datasource/mongo/dep.go
+++ b/datasource/mongo/dep.go
@@ -173,10 +173,6 @@ func (ds *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencys [
 	return discovery.CreateResponse(discovery.ResponseSuccess, "Create dependency successfully."), nil
 }
 
-func (ds *DepManager) DeleteDependency() {
-	panic("implement me")
-}
-
 func (ds *DepManager) DependencyHandle(ctx context.Context) (err error) {
 	return nil
 }
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index fe80ba7..fa9925c 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -83,14 +83,14 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
 			providerID, microService.Environment, microService.AppId, microService.ServiceName, microService.Version), err)
 		return
 	}
-	PublishInstanceEvent(evt, domainProject, discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
+	PublishInstanceEvent(evt, discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
 	return &InstanceEventHandler{}
 }
 
-func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *discovery.MicroServiceKey, subscribers []string) {
+func PublishInstanceEvent(evt sd.MongoEvent, serviceKey *discovery.MicroServiceKey, subscribers []string) {
 	if len(subscribers) == 0 {
 		return
 	}
@@ -101,7 +101,7 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
 		Instance: evt.Value.(model.Instance).Instance,
 	}
 	for _, consumerID := range subscribers {
-		evt := event.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
+		evt := event.NewInstanceEvent(consumerID, -1, simple.FromTime(time.Now()), response)
 		err := event.Center().Fire(evt)
 		if err != nil {
 			log.Error(fmt.Sprintf("publish event[%v] into channel failed", evt), err)
diff --git a/server/api.go b/server/api.go
index eb75d7a..0810428 100644
--- a/server/api.go
+++ b/server/api.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"fmt"
 	"net"
-	"strconv"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/server/service/disco"
@@ -53,34 +52,16 @@ func InitAPI() {
 	core.ServiceAPI = disco.AssembleResources()
 }
 
-type APIType int64
-
-func (t APIType) String() string {
-	switch t {
-	case RPC:
-		return "grpc" // support grpc
-	case REST:
-		return "rest"
-	default:
-		return "SCHEME" + strconv.Itoa(int(t))
-	}
-}
-
 type APIServer struct {
-	Listeners map[APIType]string
+	Listeners  []string
+	HTTPServer *rest.Server
 
-	restSrv   *rest.Server
 	isClose   bool
 	forked    bool
 	err       chan error
 	goroutine *gopool.Pool
 }
 
-const (
-	RPC  APIType = 0
-	REST APIType = 1
-)
-
 func (s *APIServer) Err() <-chan error {
 	return s.err
 }
@@ -97,48 +78,43 @@ func (s *APIServer) MarkForked() {
 	s.forked = true
 }
 
-func (s *APIServer) AddListener(t APIType, ip, port string) {
-	if s.Listeners == nil {
-		s.Listeners = map[APIType]string{}
-	}
+func (s *APIServer) AddListener(ip, port string) {
 	if len(ip) == 0 {
 		return
 	}
-	s.Listeners[t] = net.JoinHostPort(ip, port)
+	s.Listeners = append(s.Listeners, net.JoinHostPort(ip, port))
 }
 
-func (s *APIServer) populateEndpoint(t APIType, ipPort string) {
+func (s *APIServer) populateEndpoint(ipPort string) {
 	if len(ipPort) == 0 {
 		return
 	}
-	address := fmt.Sprintf("%s://%s/", t, ipPort)
+	address := fmt.Sprintf("rest://%s/", ipPort)
 	if config.GetSSL().SslEnabled {
 		address += "?sslEnabled=true"
 	}
 	core.Instance.Endpoints = append(core.Instance.Endpoints, address)
 }
 
-func (s *APIServer) startRESTServer() (err error) {
-	addr, ok := s.Listeners[REST]
-	if !ok {
-		return
-	}
-	s.restSrv, err = rs.NewServer(addr)
-	if err != nil {
-		return
-	}
-	log.Infof("listen address: %s://%s", REST, s.restSrv.Listener.Addr().String())
-
-	s.populateEndpoint(REST, s.restSrv.Listener.Addr().String())
-
-	s.goroutine.Do(func(_ context.Context) {
-		err := s.restSrv.Serve()
-		if s.isClose {
+func (s *APIServer) serve() (err error) {
+	for i, addr := range s.Listeners {
+		s.HTTPServer, err = rs.NewServer(addr)
+		if err != nil {
 			return
 		}
-		log.Errorf(err, "error to start REST API server %s", addr)
-		s.err <- err
-	})
+		log.Infof("listen address[%d]: rest://%s", i, s.HTTPServer.Listener.Addr().String())
+
+		s.populateEndpoint(s.HTTPServer.Listener.Addr().String())
+
+		s.goroutine.Do(func(_ context.Context) {
+			err := s.HTTPServer.Serve()
+			if s.isClose {
+				return
+			}
+			log.Errorf(err, "error to serve %s", addr)
+			s.err <- err
+		})
+	}
 	return
 }
 
@@ -150,7 +126,7 @@ func (s *APIServer) Start() {
 
 	core.Instance.Endpoints = nil
 
-	err := s.startRESTServer()
+	err := s.serve()
 	if err != nil {
 		s.err <- err
 		return
@@ -179,8 +155,8 @@ func (s *APIServer) Stop() {
 		s.selfUnregister()
 	}
 
-	if s.restSrv != nil {
-		s.restSrv.Shutdown()
+	if s.HTTPServer != nil {
+		s.HTTPServer.Shutdown()
 	}
 
 	close(s.err)
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
deleted file mode 100644
index b786c8f..0000000
--- a/server/connection/grpc/stream.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package grpc
-
-import (
-	"context"
-	"errors"
-	"time"
-
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/proto"
-	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/connection"
-	"github.com/apache/servicecomb-service-center/server/event"
-	"github.com/apache/servicecomb-service-center/server/metrics"
-)
-
-const GRPC = "gRPC"
-
-func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
-	timer := time.NewTimer(connection.HeartbeatInterval)
-	defer timer.Stop()
-	for {
-		select {
-		case <-stream.Context().Done():
-			return
-		case <-timer.C:
-			timer.Reset(connection.HeartbeatInterval)
-		case job := <-watcher.Job:
-			if job == nil {
-				err = errors.New("channel is closed")
-				log.Errorf(err, "watcher caught an exception, subject: %s, group: %s",
-					watcher.Subject(), watcher.Group())
-				return
-			}
-			if job.Response == nil {
-				continue
-			}
-			resp := job.Response
-			log.Infof("event is coming in, watcher, subject: %s, group: %s",
-				watcher.Subject(), watcher.Group())
-
-			err = stream.Send(resp)
-			metrics.ReportPublishCompleted(job, err)
-			if err != nil {
-				log.Errorf(err, "send message error, subject: %s, group: %s",
-					watcher.Subject(), watcher.Group())
-				watcher.SetError(err)
-				return
-			}
-			util.ResetTimer(timer, connection.HeartbeatInterval)
-		}
-	}
-}
-
-func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
-	domainProject := util.ParseDomainProject(ctx)
-	domain := util.ParseDomain(ctx)
-	watcher := event.NewInstanceSubscriber(serviceID, domainProject)
-	err = event.Center().AddSubscriber(watcher)
-	if err != nil {
-		return
-	}
-	metrics.ReportSubscriber(domain, GRPC, 1)
-	err = Handle(watcher, stream)
-	metrics.ReportSubscriber(domain, GRPC, -1)
-	return
-}
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
deleted file mode 100644
index 3bb7d78..0000000
--- a/server/connection/grpc/stream_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package grpc_test
-
-import (
-	"context"
-	"testing"
-	"time"
-
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	simple "github.com/apache/servicecomb-service-center/pkg/time"
-	stream "github.com/apache/servicecomb-service-center/server/connection/grpc"
-	"github.com/apache/servicecomb-service-center/server/event"
-	pb "github.com/go-chassis/cari/discovery"
-	"google.golang.org/grpc"
-)
-
-type grpcWatchServer struct {
-	grpc.ServerStream
-}
-
-func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
-	return nil
-}
-
-func (x *grpcWatchServer) Context() context.Context {
-	return context.Background()
-}
-
-func TestHandleWatchJob(t *testing.T) {
-	w := event.NewInstanceSubscriber("g", "s")
-	w.Job <- nil
-	err := stream.Handle(w, &grpcWatchServer{})
-	if err == nil {
-		t.Fatalf("TestHandleWatchJob failed")
-	}
-	w.Job <- event.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
-	w.Job <- nil
-	stream.Handle(w, &grpcWatchServer{})
-}
-
-func TestDoStreamListAndWatch(t *testing.T) {
-	defer log.Recover()
-	err := stream.Watch(context.Background(), "s", nil)
-	t.Fatal("TestDoStreamListAndWatch failed", err)
-}
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
index 74f30fc..9266573 100644
--- a/server/event/instance_event.go
+++ b/server/event/instance_event.go
@@ -34,17 +34,9 @@ type InstanceEvent struct {
 	Response *pb.WatchInstanceResponse
 }
 
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+func NewInstanceEvent(serviceID string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
 	return &InstanceEvent{
-		Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
-		Revision: rev,
-		Response: response,
-	}
-}
-
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
-	return &InstanceEvent{
-		Event:    event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+		Event:    event.NewEventWithTime(INSTANCE, response.Key.Tenant, serviceID, createAt),
 		Revision: rev,
 		Response: response,
 	}
diff --git a/server/pubsub/pubsub.go b/server/pubsub/pubsub.go
new file mode 100644
index 0000000..278387d
--- /dev/null
+++ b/server/pubsub/pubsub.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"errors"
+
+	pb "github.com/go-chassis/cari/discovery"
+	"github.com/gorilla/websocket"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
+)
+
+var ErrRequiredServiceID = errors.New("required the serviceID")
+
+// Watch listen the provider instance events by serviceID
+func Watch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
+	log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
+	if err := ExistService(ctx, in.SelfServiceId); err != nil {
+		ws.SendEstablishError(conn, err)
+		return
+	}
+	ws.Watch(ctx, in.SelfServiceId, conn)
+}
+func ExistService(ctx context.Context, selfServiceID string) error {
+	if len(selfServiceID) == 0 {
+		return ErrRequiredServiceID
+	}
+	resp, err := datasource.GetMetadataManager().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
+		ServiceId: selfServiceID,
+	})
+	if err != nil {
+		log.Error("", err)
+		return err
+	}
+	if !resp.Exist {
+		return datasource.ErrServiceNotExists
+	}
+	return nil
+}
diff --git a/server/service/disco/watch_test.go b/server/pubsub/pubsub_test.go
similarity index 68%
rename from server/service/disco/watch_test.go
rename to server/pubsub/pubsub_test.go
index 46168d8..6edf1e8 100644
--- a/server/service/disco/watch_test.go
+++ b/server/pubsub/pubsub_test.go
@@ -14,37 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package disco_test
+package pubsub_test
 
 import (
 	"context"
-	"testing"
-
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/pubsub"
 	"github.com/apache/servicecomb-service-center/server/service/disco"
+	"testing"
 
 	pb "github.com/go-chassis/cari/discovery"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
-	"google.golang.org/grpc"
 )
 
-type grpcWatchServer struct {
-	grpc.ServerStream
-}
-
-func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
-	return nil
-}
-
-func (x *grpcWatchServer) Context() context.Context {
-	return getContext()
+func getContext() context.Context {
+	return util.WithNoCache(util.SetDomainProject(context.Background(), "default", "default"))
 }
 
 func TestInstanceService_WebSocketWatch(t *testing.T) {
 	defer func() {
 		recover()
 	}()
-	disco.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
+	pubsub.Watch(context.Background(), &pb.WatchInstanceRequest{}, nil)
 }
 
 var _ = Describe("'Instance' service", func() {
@@ -54,7 +46,7 @@ var _ = Describe("'Instance' service", func() {
 		)
 
 		It("should be passed", func() {
-			respCreate, err := serviceResource.Create(getContext(), &pb.CreateServiceRequest{
+			respCreate, err := disco.RegisterService(getContext(), &pb.CreateServiceRequest{
 				Service: &pb.MicroService{
 					ServiceName: "service_name_watch",
 					AppId:       "service_name_watch",
@@ -71,27 +63,15 @@ var _ = Describe("'Instance' service", func() {
 		Context("when request is invalid", func() {
 			It("should be failed", func() {
 				By("service does not exist")
-				err := disco.WatchPreOpera(getContext(), &pb.WatchInstanceRequest{
-					SelfServiceId: "-1",
-				})
-				Expect(err).NotTo(BeNil())
-
-				err = disco.Watch(&pb.WatchInstanceRequest{
-					SelfServiceId: "-1",
-				}, &grpcWatchServer{})
+				err := pubsub.ExistService(getContext(), "-1")
 				Expect(err).NotTo(BeNil())
 
 				By("service id is empty")
-				err = disco.WatchPreOpera(getContext(), &pb.WatchInstanceRequest{
-					SelfServiceId: "",
-				})
+				err = pubsub.ExistService(getContext(), "")
 				Expect(err).NotTo(BeNil())
 
 				By("request is valid")
-				err = disco.WatchPreOpera(getContext(),
-					&pb.WatchInstanceRequest{
-						SelfServiceId: serviceId,
-					})
+				err = pubsub.ExistService(getContext(), serviceId)
 				Expect(err).To(BeNil())
 			})
 		})
diff --git a/server/connection/ws/broker.go b/server/pubsub/ws/broker.go
similarity index 100%
rename from server/connection/ws/broker.go
rename to server/pubsub/ws/broker.go
diff --git a/server/connection/ws/broker_test.go b/server/pubsub/ws/broker_test.go
similarity index 95%
rename from server/connection/ws/broker_test.go
rename to server/pubsub/ws/broker_test.go
index 2c95501..55241fb 100644
--- a/server/connection/ws/broker_test.go
+++ b/server/pubsub/ws/broker_test.go
@@ -21,8 +21,8 @@ import (
 	"context"
 	"testing"
 
-	"github.com/apache/servicecomb-service-center/server/connection/ws"
 	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
 	"github.com/stretchr/testify/assert"
 )
 
diff --git a/server/connection/ws/common.go b/server/pubsub/ws/common.go
similarity index 100%
rename from server/connection/ws/common.go
rename to server/pubsub/ws/common.go
diff --git a/server/connection/ws/common_test.go b/server/pubsub/ws/common_test.go
similarity index 88%
rename from server/connection/ws/common_test.go
rename to server/pubsub/ws/common_test.go
index df8ae55..95656db 100644
--- a/server/connection/ws/common_test.go
+++ b/server/pubsub/ws/common_test.go
@@ -22,14 +22,14 @@ import (
 	"errors"
 	"testing"
 
-	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
 	"github.com/stretchr/testify/assert"
 )
 
 func TestSendEstablishError(t *testing.T) {
 	mock := NewTest()
 	t.Run("should read the err when call", func(t *testing.T) {
-		wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+		ws.SendEstablishError(mock.ServerConn, errors.New("error"))
 		_, message, err := mock.ClientConn.ReadMessage()
 		assert.Nil(t, err)
 		assert.Equal(t, "error", string(message))
@@ -42,6 +42,6 @@ func TestWatch(t *testing.T) {
 		mock.ServerConn.Close()
 		ctx, cancel := context.WithCancel(context.Background())
 		cancel()
-		wss.Watch(ctx, "", mock.ServerConn)
+		ws.Watch(ctx, "", mock.ServerConn)
 	})
 }
diff --git a/server/connection/connection.go b/server/pubsub/ws/connection.go
similarity index 98%
rename from server/connection/connection.go
rename to server/pubsub/ws/connection.go
index c34005f..fb198ca 100644
--- a/server/connection/connection.go
+++ b/server/pubsub/ws/connection.go
@@ -16,7 +16,7 @@
  */
 
 // connection pkg impl the pub/sub mechanism of the long connection of diff protocols
-package connection
+package ws
 
 import "time"
 
diff --git a/server/connection/ws/health_check.go b/server/pubsub/ws/health_check.go
similarity index 100%
rename from server/connection/ws/health_check.go
rename to server/pubsub/ws/health_check.go
diff --git a/server/connection/ws/health_check_test.go b/server/pubsub/ws/health_check_test.go
similarity index 95%
rename from server/connection/ws/health_check_test.go
rename to server/pubsub/ws/health_check_test.go
index 9928f3e..1dfbaf1 100644
--- a/server/connection/ws/health_check_test.go
+++ b/server/pubsub/ws/health_check_test.go
@@ -20,7 +20,7 @@ package ws_test
 import (
 	"testing"
 
-	"github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
 	"github.com/stretchr/testify/assert"
 )
 
diff --git a/server/connection/ws/options.go b/server/pubsub/ws/options.go
similarity index 83%
rename from server/connection/ws/options.go
rename to server/pubsub/ws/options.go
index 3196623..a3f1cd7 100644
--- a/server/connection/ws/options.go
+++ b/server/pubsub/ws/options.go
@@ -19,8 +19,6 @@ package ws
 
 import (
 	"time"
-
-	"github.com/apache/servicecomb-service-center/server/connection"
 )
 
 type Options struct {
@@ -31,8 +29,8 @@ type Options struct {
 
 func ToOptions() Options {
 	return Options{
-		ReadTimeout:    connection.ReadTimeout,
-		SendTimeout:    connection.SendTimeout,
-		HealthInterval: connection.HeartbeatInterval,
+		ReadTimeout:    ReadTimeout,
+		SendTimeout:    SendTimeout,
+		HealthInterval: HeartbeatInterval,
 	}
 }
diff --git a/server/connection/ws/websocket.go b/server/pubsub/ws/websocket.go
similarity index 97%
rename from server/connection/ws/websocket.go
rename to server/pubsub/ws/websocket.go
index 0014b8c..14482bd 100644
--- a/server/connection/ws/websocket.go
+++ b/server/pubsub/ws/websocket.go
@@ -25,7 +25,6 @@ import (
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/connection"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 )
@@ -98,7 +97,7 @@ func (wh *WebSocket) registerMessageHandler() {
 }
 
 func (wh *WebSocket) ReadMessage() error {
-	wh.Conn.SetReadLimit(connection.ReadMaxBody)
+	wh.Conn.SetReadLimit(ReadMaxBody)
 	err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
 	if err != nil {
 		log.Error("", err)
diff --git a/server/connection/ws/websocket_test.go b/server/pubsub/ws/websocket_test.go
similarity index 92%
rename from server/connection/ws/websocket_test.go
rename to server/pubsub/ws/websocket_test.go
index cd4ad8b..deb1d19 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/pubsub/ws/websocket_test.go
@@ -26,9 +26,9 @@ import (
 	"testing"
 	"time"
 
-	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
 	"github.com/apache/servicecomb-service-center/server/core"
 	"github.com/apache/servicecomb-service-center/server/event"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
 	"github.com/gorilla/websocket"
 	"github.com/stretchr/testify/assert"
 )
@@ -81,15 +81,15 @@ func NewTest() *watcherConn {
 func TestNewWebSocket(t *testing.T) {
 	mock := NewTest()
 	t.Run("should return not nil when new", func(t *testing.T) {
-		assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
+		assert.NotNil(t, ws.NewWebSocket("", "", mock.ServerConn))
 	})
 }
 
 func TestWebSocket_NeedCheck(t *testing.T) {
 	mock := NewTest()
 	conn := mock.ServerConn
-	options := wss.ToOptions()
-	webSocket := &wss.WebSocket{
+	options := ws.ToOptions()
+	webSocket := &ws.WebSocket{
 		Options:       options,
 		DomainProject: "default",
 		ConsumerID:    "",
@@ -119,7 +119,7 @@ func TestWebSocket_NeedCheck(t *testing.T) {
 
 func TestWebSocket_Idle(t *testing.T) {
 	mock := NewTest()
-	webSocket := wss.NewWebSocket("", "", mock.ServerConn)
+	webSocket := ws.NewWebSocket("", "", mock.ServerConn)
 
 	t.Run("should idle when new", func(t *testing.T) {
 		select {
@@ -156,13 +156,13 @@ func TestWebSocket_CheckHealth(t *testing.T) {
 	event.Center().Start()
 
 	t.Run("should do nothing when recv PING", func(t *testing.T) {
-		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		ws := ws.NewWebSocket("", "", mock.ServerConn)
 		mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
 		<-time.After(time.Second)
 		assert.Nil(t, ws.CheckHealth(context.Background()))
 	})
 	t.Run("should return err when consumer not exist", func(t *testing.T) {
-		ws := wss.NewWebSocket("", "", mock.ServerConn)
+		ws := ws.NewWebSocket("", "", mock.ServerConn)
 		assert.Equal(t, "service does not exist", ws.CheckHealth(context.Background()).Error())
 	})
 }
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index b9f9bf8..24ddb4d 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -20,12 +20,11 @@ package v4
 import (
 	"net/http"
 
-	discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
-	"github.com/apache/servicecomb-service-center/server/service/heartbeat"
-
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/rest"
 	"github.com/apache/servicecomb-service-center/server/handler/exception"
+	"github.com/apache/servicecomb-service-center/server/pubsub"
+	"github.com/apache/servicecomb-service-center/server/service/heartbeat"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 )
@@ -72,7 +71,7 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
 	defer conn.Close()
 
 	r.Method = "WATCH"
-	discosvc.WebSocketWatch(r.Context(), &pb.WatchInstanceRequest{
+	pubsub.Watch(r.Context(), &pb.WatchInstanceRequest{
 		SelfServiceId: r.URL.Query().Get(":serviceId"),
 	}, conn)
 }
diff --git a/server/server.go b/server/server.go
index 87b430c..4be3771 100644
--- a/server/server.go
+++ b/server/server.go
@@ -60,10 +60,8 @@ type endpoint struct {
 }
 
 type ServiceCenterServer struct {
-	REST endpoint
-	GRPC endpoint
-
-	apiService          *APIServer
+	Endpoint            endpoint
+	APIServer           *APIServer
 	eventCenter         *nf.BusService
 	syncerNotifyService *snf.Service
 }
@@ -79,7 +77,7 @@ func (s *ServiceCenterServer) Run() {
 }
 
 func (s *ServiceCenterServer) waitForQuit() {
-	err := <-s.apiService.Err()
+	err := <-s.APIServer.Err()
 	if err != nil {
 		log.Errorf(err, "service center catch errors")
 	}
@@ -95,16 +93,14 @@ func (s *ServiceCenterServer) initialize() {
 	s.initSSL()
 	// Datasource
 	s.initDatasource()
-	s.apiService = GetAPIServer()
+	s.APIServer = GetAPIServer()
 	s.eventCenter = event.Center()
 	s.syncerNotifyService = snf.GetSyncerNotifyCenter()
 }
 
 func (s *ServiceCenterServer) initEndpoints() {
-	s.REST.Host = config.GetString("server.host", "", config.WithStandby("httpaddr"))
-	s.REST.Port = config.GetString("server.port", "", config.WithStandby("httpport"))
-	s.GRPC.Host = config.GetString("server.rpc.host", "", config.WithStandby("rpcaddr"))
-	s.GRPC.Port = config.GetString("server.rpc.port", "", config.WithStandby("rpcport"))
+	s.Endpoint.Host = config.GetString("server.host", "", config.WithStandby("httpaddr"))
+	s.Endpoint.Port = config.GetString("server.port", "", config.WithStandby("httpport"))
 }
 
 func (s *ServiceCenterServer) initDatasource() {
@@ -138,14 +134,10 @@ func (s *ServiceCenterServer) initMetrics() {
 		interval = defaultCollectPeriod
 	}
 	var instance string
-	if len(s.REST.Host) > 0 {
-		instance = net.JoinHostPort(s.REST.Host, s.REST.Port)
+	if len(s.Endpoint.Host) > 0 {
+		instance = net.JoinHostPort(s.Endpoint.Host, s.Endpoint.Port)
 	} else {
-		if len(s.GRPC.Host) > 0 {
-			instance = net.JoinHostPort(s.GRPC.Host, s.GRPC.Port)
-		} else {
-			log.Fatal("init metrics InstanceName failed", nil)
-		}
+		log.Fatal("init metrics InstanceName failed", nil)
 	}
 
 	if err := metrics.Init(metrics.Options{
@@ -207,14 +199,13 @@ func (s *ServiceCenterServer) startServices() {
 
 func (s *ServiceCenterServer) startAPIService() {
 	core.Instance.HostName = util.HostName()
-	s.apiService.AddListener(REST, s.REST.Host, s.REST.Port)
-	s.apiService.AddListener(RPC, s.REST.Host, s.GRPC.Port)
-	s.apiService.Start()
+	s.APIServer.AddListener(s.Endpoint.Host, s.Endpoint.Port)
+	s.APIServer.Start()
 }
 
 func (s *ServiceCenterServer) Stop() {
-	if s.apiService != nil {
-		s.apiService.Stop()
+	if s.APIServer != nil {
+		s.APIServer.Stop()
 	}
 
 	if s.eventCenter != nil {
diff --git a/server/service/disco/watch.go b/server/service/disco/watch.go
deleted file mode 100644
index 8870816..0000000
--- a/server/service/disco/watch.go
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package disco
-
-import (
-	"context"
-	"errors"
-	"fmt"
-
-	pb "github.com/go-chassis/cari/discovery"
-	"github.com/gorilla/websocket"
-
-	"github.com/apache/servicecomb-service-center/datasource"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/proto"
-	"github.com/apache/servicecomb-service-center/server/connection/grpc"
-	"github.com/apache/servicecomb-service-center/server/connection/ws"
-)
-
-func WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error {
-	if in == nil || len(in.SelfServiceId) == 0 {
-		return errors.New("request format invalid")
-	}
-	resp, err := datasource.GetMetadataManager().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
-		ServiceId: in.SelfServiceId,
-	})
-	if err != nil {
-		log.Error("", err)
-		return err
-	}
-	if !resp.Exist {
-		return datasource.ErrServiceNotExists
-	}
-	return nil
-}
-
-func Watch(in *pb.WatchInstanceRequest, stream proto.ServiceInstanceCtrlWatchServer) error {
-	log.Infof("new a stream list and watch with service[%s]", in.SelfServiceId)
-	if err := WatchPreOpera(stream.Context(), in); err != nil {
-		log.Errorf(err, "service[%s] establish watch failed: invalid params", in.SelfServiceId)
-		return err
-	}
-
-	return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
-}
-
-func WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
-	log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
-	if err := WatchPreOpera(ctx, in); err != nil {
-		ws.SendEstablishError(conn, err)
-		return
-	}
-	ws.Watch(ctx, in.SelfServiceId, conn)
-}
-
-func QueryAllProvidersInstances(ctx context.Context, in *pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
-	depResp, err := datasource.GetDependencyManager().SearchConsumerDependency(ctx, &pb.GetDependenciesRequest{
-		ServiceId: in.SelfServiceId,
-	})
-	if err != nil {
-		log.Error(fmt.Sprintf("search service[%s] dependencies failed", in.SelfServiceId), err)
-		return nil, 0
-	}
-	if depResp.Response.GetCode() != pb.ResponseSuccess {
-		log.Error(fmt.Sprintf("search service[%s] dependencies failed. %s",
-			in.SelfServiceId, depResp.Response.GetMessage()), nil)
-		return nil, 0
-	}
-	var results []*pb.WatchInstanceResponse
-	for _, provider := range depResp.Providers {
-		instResp, err := datasource.GetMetadataManager().GetInstances(ctx, &pb.GetInstancesRequest{
-			ProviderServiceId: provider.ServiceId,
-		})
-		if err != nil {
-			log.Error(fmt.Sprintf("get service[%s] instances failed", in.SelfServiceId), err)
-			return nil, 0
-		}
-		if instResp.Response.GetCode() != pb.ResponseSuccess {
-			log.Error(fmt.Sprintf("get service[%s] instances failed. %s",
-				in.SelfServiceId, instResp.Response.GetMessage()), nil)
-			return nil, 0
-		}
-		for _, instance := range instResp.Instances {
-			results = append(results, &pb.WatchInstanceResponse{
-				Response: pb.CreateResponse(pb.ResponseSuccess, "List instance successfully."),
-				Action:   string(pb.EVT_INIT),
-				Key: &pb.MicroServiceKey{
-					Environment: provider.Environment,
-					AppId:       provider.AppId,
-					ServiceName: provider.ServiceName,
-					Version:     provider.Version,
-				},
-				Instance: instance,
-			})
-		}
-	}
-	return results, 0
-}
diff --git a/server/service/heartbeat/websocket.go b/server/service/heartbeat/websocket.go
index 263ae2e..71cde2e 100644
--- a/server/service/heartbeat/websocket.go
+++ b/server/service/heartbeat/websocket.go
@@ -24,17 +24,16 @@ import (
 	"sync"
 	"time"
 
-	"github.com/apache/servicecomb-service-center/datasource"
-	discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
-
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 
+	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/config"
-	"github.com/apache/servicecomb-service-center/server/connection"
 	"github.com/apache/servicecomb-service-center/server/metrics"
+	"github.com/apache/servicecomb-service-center/server/pubsub/ws"
+	discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
 )
 
 const (
@@ -81,7 +80,7 @@ func (c *client) sendClose(code int, text string) error {
 	if code != websocket.CloseNoStatusReceived {
 		message = websocket.FormatCloseMessage(code, text)
 	}
-	err := c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(connection.SendTimeout))
+	err := c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(ws.SendTimeout))
 	if err != nil {
 		log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
 		return err
@@ -98,7 +97,7 @@ func (c *client) heartbeat() {
 	}()
 	for {
 		<-ticker.C
-		err := c.conn.SetWriteDeadline(time.Now().Add(connection.SendTimeout))
+		err := c.conn.SetWriteDeadline(time.Now().Add(ws.SendTimeout))
 		if err != nil {
 			log.Error("", err)
 		}
@@ -116,7 +115,7 @@ func (c *client) handleMessage() {
 
 	remoteAddr := c.conn.RemoteAddr().String()
 	c.conn.SetPongHandler(func(message string) error {
-		err := c.conn.SetReadDeadline(time.Now().Add(connection.ReadTimeout))
+		err := c.conn.SetReadDeadline(time.Now().Add(ws.ReadTimeout))
 		if err != nil {
 			log.Error("", err)
 		}