You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/11/16 07:53:04 UTC

[servicecomb-service-center] branch master updated: SCB-2094 Refactor notification center (#741)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 600b323  SCB-2094 Refactor notification center (#741)
600b323 is described below

commit 600b32303ad6fbda4cdebe29737bad8bb8978bdc
Author: little-cui <su...@qq.com>
AuthorDate: Mon Nov 16 15:44:07 2020 +0800

    SCB-2094 Refactor notification center (#741)
    
    * SCB-2094 Refactor notification center
    
    * SCB-2094 Add pkg description
---
 datasource/etcd/event/instance_event_handler.go    |  8 +--
 datasource/etcd/event/rule_event_handler.go        |  2 +-
 datasource/etcd/event/tag_event_handler.go         |  2 +-
 pkg/notify/common.go                               |  4 --
 pkg/notify/notice_test.go                          |  5 +-
 pkg/notify/notification_healthchecker.go           | 63 --------------------
 pkg/notify/notification_service.go                 | 12 ++--
 pkg/notify/notification_test.go                    |  4 +-
 pkg/notify/processor.go                            | 10 ++--
 pkg/notify/subscriber_checker.go                   | 67 ++++++++++++++++++++++
 pkg/notify/types.go                                | 12 ++--
 pkg/notify/types_test.go                           |  7 +--
 server/alarm/common.go                             |  2 +-
 server/alarm/service.go                            |  6 +-
 .../common.go => server/connection/connection.go   | 12 ++--
 server/{notify => connection/grpc}/stream.go       | 28 +++++----
 server/{notify => connection/grpc}/stream_test.go  | 15 ++---
 server/{notify => connection}/metrics.go           | 32 ++++++-----
 server/{notify => connection/ws}/publisher.go      |  4 +-
 server/{notify => connection/ws}/websocket.go      | 44 +++++++-------
 server/{notify => connection/ws}/websocket_test.go | 23 ++++----
 server/health/health_test.go                       |  2 +-
 server/notify/center.go                            |  3 +-
 server/notify/common.go                            | 29 ----------
 .../{listwatcher.go => instance_subscriber.go}     |  9 ++-
 server/server.go                                   |  2 +-
 server/service/watch.go                            | 13 +++--
 27 files changed, 205 insertions(+), 215 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index ddce862..d424dc9 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -82,7 +82,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 		}
 	}
 
-	if notify.GetNotifyCenter().Closed() {
+	if notify.Center().Closed() {
 		log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
 			action, providerID, providerInstanceID, instance.Endpoints)
 		return
@@ -136,10 +136,10 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
 	}
 	for _, consumerID := range subscribers {
 		// TODO add超时怎么处理?
-		job := notify.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
-		err := notify.GetNotifyCenter().Publish(job)
+		evt := notify.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
+		err := notify.Center().Publish(evt)
 		if err != nil {
-			log.Errorf(err, "publish job failed")
+			log.Errorf(err, "publish event[%v] into channel failed", evt)
 		}
 	}
 }
diff --git a/datasource/etcd/event/rule_event_handler.go b/datasource/etcd/event/rule_event_handler.go
index ba6a10d..0e62b75 100644
--- a/datasource/etcd/event/rule_event_handler.go
+++ b/datasource/etcd/event/rule_event_handler.go
@@ -98,7 +98,7 @@ func (h *RuleEventHandler) OnEvent(evt sd.KvEvent) {
 	}
 
 	providerID, ruleID, domainProject := core.GetInfoFromRuleKV(evt.KV.Key)
-	if notify.GetNotifyCenter().Closed() {
+	if notify.Center().Closed() {
 		log.Warnf("caught [%s] service rule[%s/%s] event, but notify service is closed",
 			action, providerID, ruleID)
 		return
diff --git a/datasource/etcd/event/tag_event_handler.go b/datasource/etcd/event/tag_event_handler.go
index c010132..c66a330 100644
--- a/datasource/etcd/event/tag_event_handler.go
+++ b/datasource/etcd/event/tag_event_handler.go
@@ -112,7 +112,7 @@ func (h *TagEventHandler) OnEvent(evt sd.KvEvent) {
 
 	consumerID, domainProject := core.GetInfoFromTagKV(evt.KV.Key)
 
-	if notify.GetNotifyCenter().Closed() {
+	if notify.Center().Closed() {
 		log.Warnf("caught [%s] service tags[%s/%s] event, but notify service is closed",
 			action, consumerID, evt.KV.Value)
 		return
diff --git a/pkg/notify/common.go b/pkg/notify/common.go
index c98e31c..a6b5337 100644
--- a/pkg/notify/common.go
+++ b/pkg/notify/common.go
@@ -20,7 +20,3 @@ package notify
 const (
 	DefaultQueueSize = 1000
 )
-
-const (
-	NOTIFTY Type = iota
-)
diff --git a/pkg/notify/notice_test.go b/pkg/notify/notice_test.go
index 5ad78d2..3eba149 100644
--- a/pkg/notify/notice_test.go
+++ b/pkg/notify/notice_test.go
@@ -21,13 +21,14 @@ import (
 )
 
 func TestNewEventWithTime(t *testing.T) {
-	evt := NewEvent(NOTIFTY, "a", "b")
+	regT := RegisterType("N", 1)
+	evt := NewEvent(regT, "a", "b")
 	if evt.CreateAt().UnixNano() == 0 {
 		t.Fatal("TestNewEventWithTime")
 	}
 	fmt.Println(evt.CreateAt())
 
-	if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+	if evt.Type() != regT || evt.Subject() != "a" || evt.Group() != "b" {
 		t.Fatal("TestNewEventWithTime")
 	}
 }
diff --git a/pkg/notify/notification_healthchecker.go b/pkg/notify/notification_healthchecker.go
deleted file mode 100644
index 2466d8c..0000000
--- a/pkg/notify/notification_healthchecker.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package notify
-
-import "github.com/apache/servicecomb-service-center/pkg/log"
-
-const (
-	ServerCheckerName  = "__HealthChecker__"
-	ServerCheckSubject = "__NotifyServerHealthCheck__"
-)
-
-//Notifier 健康检查
-type ServiceHealthChecker struct {
-	Subscriber
-}
-
-type ServiceHealthCheckJob struct {
-	Event
-	ErrorSubscriber Subscriber
-}
-
-func (s *ServiceHealthChecker) OnMessage(job Event) {
-	j := job.(*ServiceHealthCheckJob)
-	err := j.ErrorSubscriber.Err()
-
-	if j.ErrorSubscriber.Type() == NOTIFTY {
-		log.Errorf(nil, "remove %s watcher failed, here cause a dead lock, subject: %s, group: %s",
-			j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
-		return
-	}
-
-	log.Debugf("notification service remove %s watcher, error: %v, subject: %s, group: %s",
-		j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
-	s.Service().RemoveSubscriber(j.ErrorSubscriber)
-}
-
-func NewNotifyServiceHealthChecker() *ServiceHealthChecker {
-	return &ServiceHealthChecker{
-		Subscriber: NewSubscriber(NOTIFTY, ServerCheckSubject, ServerCheckerName),
-	}
-}
-
-func NewNotifyServiceHealthCheckJob(s Subscriber) *ServiceHealthCheckJob {
-	return &ServiceHealthCheckJob{
-		Event:           NewEvent(NOTIFTY, ServerCheckSubject, ServerCheckerName),
-		ErrorSubscriber: s,
-	}
-}
diff --git a/pkg/notify/notification_service.go b/pkg/notify/notification_service.go
index 445e428..44ea711 100644
--- a/pkg/notify/notification_service.go
+++ b/pkg/notify/notification_service.go
@@ -62,7 +62,7 @@ func (s *Service) Start() {
 	s.mux.Unlock()
 
 	// 错误subscriber清理
-	err := s.AddSubscriber(NewNotifyServiceHealthChecker())
+	err := s.AddSubscriber(NewSubscriberChecker())
 	if err != nil {
 		log.Error("", err)
 	}
@@ -114,19 +114,19 @@ func (s *Service) stopProcessors() {
 }
 
 //通知内容塞到队列里
-func (s *Service) Publish(job Event) error {
+func (s *Service) Publish(evt Event) error {
 	if s.Closed() {
-		return errors.New("add notify job failed for server shutdown")
+		return errors.New("add notify event failed for server shutdown")
 	}
 
 	s.mux.RLock()
-	p, ok := s.processors[job.Type()]
+	p, ok := s.processors[evt.Type()]
 	if !ok {
 		s.mux.RUnlock()
-		return errors.New("Unknown job type")
+		return errors.New("unknown event type")
 	}
 	s.mux.RUnlock()
-	p.Accept(job)
+	p.Accept(evt)
 	return nil
 }
 
diff --git a/pkg/notify/notification_test.go b/pkg/notify/notification_test.go
index f2af609..75a06c0 100644
--- a/pkg/notify/notification_test.go
+++ b/pkg/notify/notification_test.go
@@ -66,11 +66,11 @@ func TestGetNotifyService(t *testing.T) {
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
-	err = notifyService.Publish(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker()))
+	err = notifyService.Publish(NewErrEvent(NewSubscriberChecker()))
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
-	err = notifyService.Publish(NewNotifyServiceHealthCheckJob(s))
+	err = notifyService.Publish(NewErrEvent(s))
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
diff --git a/pkg/notify/processor.go b/pkg/notify/processor.go
index 5b24885..b5f1771 100644
--- a/pkg/notify/processor.go
+++ b/pkg/notify/processor.go
@@ -34,17 +34,17 @@ func (p *Processor) Name() string {
 	return p.name
 }
 
-func (p *Processor) Accept(job Event) {
-	p.Add(queue.Task{Object: job})
+func (p *Processor) Accept(evt Event) {
+	p.Add(queue.Task{Object: evt})
 }
 
 func (p *Processor) Handle(ctx context.Context, obj interface{}) {
 	p.Notify(obj.(Event))
 }
 
-func (p *Processor) Notify(job Event) {
-	if itf, ok := p.subjects.Get(job.Subject()); ok {
-		itf.(*Subject).Notify(job)
+func (p *Processor) Notify(evt Event) {
+	if itf, ok := p.subjects.Get(evt.Subject()); ok {
+		itf.(*Subject).Notify(evt)
 	}
 }
 
diff --git a/pkg/notify/subscriber_checker.go b/pkg/notify/subscriber_checker.go
new file mode 100644
index 0000000..af73396
--- /dev/null
+++ b/pkg/notify/subscriber_checker.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package notify
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+const (
+	groupCheck   = "__HealthChecker__"
+	subjectCheck = "__SubscriberHealthCheck__"
+)
+
+var TypeCheck = RegisterType("CHECKER", DefaultQueueSize)
+
+//Notifier 健康检查
+type SubscriberChecker struct {
+	Subscriber
+}
+
+type ErrEvent struct {
+	Event
+	Subscriber Subscriber
+}
+
+func (s *SubscriberChecker) OnMessage(evt Event) {
+	j := evt.(*ErrEvent)
+	err := j.Subscriber.Err()
+
+	if j.Subscriber.Type() == TypeCheck {
+		log.Errorf(nil, "remove %s subscriber failed, here cause a dead lock, subject: %s, group: %s",
+			j.Subscriber.Type(), j.Subscriber.Subject(), j.Subscriber.Group())
+		return
+	}
+
+	log.Debugf("notification service remove %s subscriber, error: %v, subject: %s, group: %s",
+		j.Subscriber.Type(), err, j.Subscriber.Subject(), j.Subscriber.Group())
+	s.Service().RemoveSubscriber(j.Subscriber)
+}
+
+func NewSubscriberChecker() *SubscriberChecker {
+	return &SubscriberChecker{
+		Subscriber: NewSubscriber(TypeCheck, subjectCheck, groupCheck),
+	}
+}
+
+func NewErrEvent(by Subscriber) *ErrEvent {
+	return &ErrEvent{
+		Event:      NewEvent(TypeCheck, subjectCheck, groupCheck),
+		Subscriber: by,
+	}
+}
diff --git a/pkg/notify/types.go b/pkg/notify/types.go
index d6cba5d..3cf125c 100644
--- a/pkg/notify/types.go
+++ b/pkg/notify/types.go
@@ -15,7 +15,9 @@
 
 package notify
 
-import "strconv"
+import (
+	"strconv"
+)
 
 type Type int
 
@@ -40,13 +42,9 @@ func (nt Type) IsValid() bool {
 	return nt >= 0 && int(nt) < len(typeQueues)
 }
 
-var typeNames = []string{
-	NOTIFTY: "NOTIFTY",
-}
+var typeNames []string
 
-var typeQueues = []int{
-	NOTIFTY: 0,
-}
+var typeQueues []int
 
 func Types() (ts []Type) {
 	for i := range typeNames {
diff --git a/pkg/notify/types_test.go b/pkg/notify/types_test.go
index 720af81..c209c31 100644
--- a/pkg/notify/types_test.go
+++ b/pkg/notify/types_test.go
@@ -15,7 +15,9 @@
 
 package notify
 
-import "testing"
+import (
+	"testing"
+)
 
 func TestRegisterType(t *testing.T) {
 	id := RegisterType("a", 0)
@@ -30,7 +32,4 @@ func TestRegisterType(t *testing.T) {
 	if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
 		t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
 	}
-	if NOTIFTY.String() != "NOTIFTY" || NOTIFTY.QueueSize() != DefaultQueueSize {
-		t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
-	}
 }
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 9243d13..1a8bf55 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -40,7 +40,7 @@ const (
 	Group   = "__ALARM_GROUP__"
 )
 
-var ALARM = notify.RegisterType("ALARM", 0)
+var ALARM = notify.RegisterType("ALARM", notify.DefaultQueueSize)
 
 func FieldBool(key string, v bool) model.Field {
 	return model.Field{Key: key, Value: v}
diff --git a/server/alarm/service.go b/server/alarm/service.go
index e399b91..8c981e8 100644
--- a/server/alarm/service.go
+++ b/server/alarm/service.go
@@ -44,7 +44,7 @@ func (ac *Service) Raise(id model.ID, fields ...model.Field) error {
 	for _, f := range fields {
 		ae.Fields[f.Key] = f.Value
 	}
-	return notify.GetNotifyCenter().Publish(ae)
+	return notify.Center().Publish(ae)
 }
 
 func (ac *Service) Clear(id model.ID) error {
@@ -53,7 +53,7 @@ func (ac *Service) Clear(id model.ID) error {
 		Status: Cleared,
 		ID:     id,
 	}
-	return notify.GetNotifyCenter().Publish(ae)
+	return notify.Center().Publish(ae)
 }
 
 func (ac *Service) ListAll() (ls []*model.AlarmEvent) {
@@ -88,7 +88,7 @@ func NewAlarmService() *Service {
 	c := &Service{
 		Subscriber: nf.NewSubscriber(ALARM, Subject, Group),
 	}
-	err := notify.GetNotifyCenter().AddSubscriber(c)
+	err := notify.Center().AddSubscriber(c)
 	if err != nil {
 		log.Error("", err)
 	}
diff --git a/pkg/notify/common.go b/server/connection/connection.go
similarity index 75%
copy from pkg/notify/common.go
copy to server/connection/connection.go
index c98e31c..c34005f 100644
--- a/pkg/notify/common.go
+++ b/server/connection/connection.go
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package notify
+// connection pkg impl the pub/sub mechanism of the long connection of diff protocols
+package connection
 
-const (
-	DefaultQueueSize = 1000
-)
+import "time"
 
 const (
-	NOTIFTY Type = iota
+	HeartbeatInterval = 30 * time.Second
+	ReadTimeout       = HeartbeatInterval * 4
+	SendTimeout       = 5 * time.Second
+	ReadMaxBody       = 64
 )
diff --git a/server/notify/stream.go b/server/connection/grpc/stream.go
similarity index 68%
rename from server/notify/stream.go
rename to server/connection/grpc/stream.go
index def2c6c..f0f9811 100644
--- a/server/notify/stream.go
+++ b/server/connection/grpc/stream.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package grpc
 
 import (
 	"context"
@@ -24,18 +24,22 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/proto"
 	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/notify"
 	"time"
 )
 
-func HandleWatchJob(watcher *InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
-	timer := time.NewTimer(HeartbeatInterval)
+const GRPC = "gRPC"
+
+func Handle(watcher *notify.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+	timer := time.NewTimer(connection.HeartbeatInterval)
 	defer timer.Stop()
 	for {
 		select {
 		case <-stream.Context().Done():
 			return
 		case <-timer.C:
-			timer.Reset(HeartbeatInterval)
+			timer.Reset(connection.HeartbeatInterval)
 		case job := <-watcher.Job:
 			if job == nil {
 				err = errors.New("channel is closed")
@@ -51,28 +55,28 @@ func HandleWatchJob(watcher *InstanceEventListWatcher, stream proto.ServiceInsta
 				watcher.Subject(), watcher.Group())
 
 			err = stream.Send(resp)
-			ReportPublishCompleted(job, err)
+			connection.ReportPublishCompleted(job, err)
 			if err != nil {
 				log.Errorf(err, "send message error, subject: %s, group: %s",
 					watcher.Subject(), watcher.Group())
 				watcher.SetError(err)
 				return
 			}
-			util.ResetTimer(timer, HeartbeatInterval)
+			util.ResetTimer(timer, connection.HeartbeatInterval)
 		}
 	}
 }
 
-func DoStreamListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	watcher := NewInstanceEventListWatcher(serviceID, domainProject, f)
-	err = GetNotifyCenter().AddSubscriber(watcher)
+	watcher := notify.NewInstanceEventListWatcher(serviceID, domainProject, f)
+	err = notify.Center().AddSubscriber(watcher)
 	if err != nil {
 		return
 	}
-	ReportSubscriber(domain, GRPC, 1)
-	err = HandleWatchJob(watcher, stream)
-	ReportSubscriber(domain, GRPC, -1)
+	connection.ReportSubscriber(domain, GRPC, 1)
+	err = Handle(watcher, stream)
+	connection.ReportSubscriber(domain, GRPC, -1)
 	return
 }
diff --git a/server/notify/stream_test.go b/server/connection/grpc/stream_test.go
similarity index 77%
rename from server/notify/stream_test.go
rename to server/connection/grpc/stream_test.go
index 052d000..3fd8c9d 100644
--- a/server/notify/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify_test
+package grpc_test
 
 import (
 	"context"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	simple "github.com/apache/servicecomb-service-center/pkg/time"
-	. "github.com/apache/servicecomb-service-center/server/notify"
+	stream "github.com/apache/servicecomb-service-center/server/connection/grpc"
+	"github.com/apache/servicecomb-service-center/server/notify"
 	"google.golang.org/grpc"
 	"testing"
 	"time"
@@ -40,19 +41,19 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-	w := NewInstanceEventListWatcher("g", "s", nil)
+	w := notify.NewInstanceEventListWatcher("g", "s", nil)
 	w.Job <- nil
-	err := HandleWatchJob(w, &grpcWatchServer{})
+	err := stream.Handle(w, &grpcWatchServer{})
 	if err == nil {
 		t.Fatalf("TestHandleWatchJob failed")
 	}
-	w.Job <- NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
+	w.Job <- notify.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
 	w.Job <- nil
-	HandleWatchJob(w, &grpcWatchServer{})
+	stream.Handle(w, &grpcWatchServer{})
 }
 
 func TestDoStreamListAndWatch(t *testing.T) {
 	defer log.Recover()
-	err := DoStreamListAndWatch(context.Background(), "s", nil, nil)
+	err := stream.ListAndWatch(context.Background(), "s", nil, nil)
 	t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/notify/metrics.go b/server/connection/metrics.go
similarity index 70%
rename from server/notify/metrics.go
rename to server/connection/metrics.go
index 6bde05b..00b201a 100644
--- a/server/notify/metrics.go
+++ b/server/connection/metrics.go
@@ -1,19 +1,21 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-package notify
+package connection
 
 import (
 	"github.com/apache/servicecomb-service-center/pkg/notify"
diff --git a/server/notify/publisher.go b/server/connection/ws/publisher.go
similarity index 97%
rename from server/notify/publisher.go
rename to server/connection/ws/publisher.go
index 15c7b22..a8ee692 100644
--- a/server/notify/publisher.go
+++ b/server/connection/ws/publisher.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package ws
 
 import (
 	"context"
@@ -47,7 +47,7 @@ func (wh *Publisher) Stop() {
 
 func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
 	wh.goroutine.Do(func(ctx context.Context) {
-		ws.HandleWatchWebSocketJob(payload)
+		ws.HandleEvent(payload)
 	})
 }
 
diff --git a/server/notify/websocket.go b/server/connection/ws/websocket.go
similarity index 87%
rename from server/notify/websocket.go
rename to server/connection/ws/websocket.go
index de6217b..a61ce16 100644
--- a/server/notify/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package ws
 
 import (
 	"context"
@@ -25,23 +25,27 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/notify"
 	"github.com/gorilla/websocket"
 	"time"
 )
 
+const Websocket = "Websocket"
+
 type WebSocket struct {
 	ctx    context.Context
 	ticker *time.Ticker
 	conn   *websocket.Conn
 	// watcher subscribe the notification service event
-	watcher         *InstanceEventListWatcher
+	watcher         *notify.InstanceEventListWatcher
 	needPingWatcher bool
 	free            chan struct{}
 	closed          chan struct{}
 }
 
 func (wh *WebSocket) Init() error {
-	wh.ticker = time.NewTicker(HeartbeatInterval)
+	wh.ticker = time.NewTicker(connection.HeartbeatInterval)
 	wh.needPingWatcher = true
 	wh.free = make(chan struct{}, 1)
 	wh.closed = make(chan struct{})
@@ -51,7 +55,7 @@ func (wh *WebSocket) Init() error {
 	remoteAddr := wh.conn.RemoteAddr().String()
 
 	// put in notification service queue
-	if err := GetNotifyCenter().AddSubscriber(wh.watcher); err != nil {
+	if err := notify.Center().AddSubscriber(wh.watcher); err != nil {
 		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
 			remoteAddr, err.Error())
 		log.Errorf(nil, err.Error())
@@ -72,11 +76,11 @@ func (wh *WebSocket) Init() error {
 }
 
 func (wh *WebSocket) ReadTimeout() time.Duration {
-	return ReadTimeout
+	return connection.ReadTimeout
 }
 
 func (wh *WebSocket) SendTimeout() time.Duration {
-	return SendTimeout
+	return connection.SendTimeout
 }
 
 func (wh *WebSocket) Heartbeat(messageType int) error {
@@ -94,7 +98,7 @@ func (wh *WebSocket) Heartbeat(messageType int) error {
 	return nil
 }
 
-func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocket) HandleControlMessage() {
 	remoteAddr := wh.conn.RemoteAddr().String()
 	// PING
 	wh.conn.SetPingHandler(func(message string) error {
@@ -130,7 +134,7 @@ func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
 		return wh.sendClose(code, text)
 	})
 
-	wh.conn.SetReadLimit(ReadMaxBody)
+	wh.conn.SetReadLimit(connection.ReadMaxBody)
 	err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
 	if err != nil {
 		log.Error("", err)
@@ -186,8 +190,8 @@ func (wh *WebSocket) Pick() interface{} {
 	return nil
 }
 
-// HandleWatchWebSocketJob will be called if Pick() returns not nil
-func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
+// HandleEvent will be called if Pick() returns not nil
+func (wh *WebSocket) HandleEvent(o interface{}) {
 	defer wh.SetReady()
 
 	var (
@@ -222,7 +226,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
 		log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
 			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
 		return
-	case *InstanceEvent:
+	case *notify.InstanceEvent:
 		resp := o.Response
 
 		providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
@@ -254,8 +258,8 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
 	}
 
 	err := wh.WriteMessage(message)
-	if evt, ok := o.(*InstanceEvent); ok {
-		ReportPublishCompleted(evt, err)
+	if evt, ok := o.(*notify.InstanceEvent); ok {
+		connection.ReportPublishCompleted(evt, err)
 	}
 	if err != nil {
 		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
@@ -286,14 +290,14 @@ func (wh *WebSocket) Stop() {
 	close(wh.closed)
 }
 
-func DoWebSocketListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	socket := NewWebSocket(ctx, conn, NewInstanceEventListWatcher(serviceID, domainProject, f))
+	socket := New(ctx, conn, notify.NewInstanceEventListWatcher(serviceID, domainProject, f))
 
-	ReportSubscriber(domain, Websocket, 1)
+	connection.ReportSubscriber(domain, Websocket, 1)
 	process(socket)
-	ReportSubscriber(domain, Websocket, -1)
+	connection.ReportSubscriber(domain, Websocket, -1)
 }
 
 func process(socket *WebSocket) {
@@ -301,12 +305,12 @@ func process(socket *WebSocket) {
 		return
 	}
 
-	socket.HandleWatchWebSocketControlMessage()
+	socket.HandleControlMessage()
 
 	socket.Stop()
 }
 
-func EstablishWebSocketError(conn *websocket.Conn, err error) {
+func SendEstablishError(conn *websocket.Conn, err error) {
 	remoteAddr := conn.RemoteAddr().String()
 	log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
 	if err := conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error())); err != nil {
@@ -314,7 +318,7 @@ func EstablishWebSocketError(conn *websocket.Conn, err error) {
 	}
 }
 
-func NewWebSocket(ctx context.Context, conn *websocket.Conn, watcher *InstanceEventListWatcher) *WebSocket {
+func New(ctx context.Context, conn *websocket.Conn, watcher *notify.InstanceEventListWatcher) *WebSocket {
 	return &WebSocket{
 		ctx:     ctx,
 		conn:    conn,
diff --git a/server/notify/websocket_test.go b/server/connection/ws/websocket_test.go
similarity index 86%
rename from server/notify/websocket_test.go
rename to server/connection/ws/websocket_test.go
index 751bbdb..47063ba 100644
--- a/server/notify/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify_test
+package ws_test
 
 // initialize
 import _ "github.com/apache/servicecomb-service-center/test"
@@ -22,6 +22,7 @@ import (
 	"context"
 	"errors"
 	"github.com/apache/servicecomb-service-center/pkg/registry"
+	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
 	"github.com/apache/servicecomb-service-center/server/core"
 	. "github.com/apache/servicecomb-service-center/server/notify"
 	"github.com/gorilla/websocket"
@@ -64,7 +65,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 	conn, _, _ := websocket.DefaultDialer.Dial(
 		strings.Replace(s.URL, "http://", "ws://", 1), nil)
 
-	EstablishWebSocketError(conn, errors.New("error"))
+	wss.SendEstablishError(conn, errors.New("error"))
 
 	w := NewInstanceEventListWatcher("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
 		results = append(results, &registry.WatchInstanceResponse{
@@ -76,33 +77,33 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 		return
 	})
 
-	ws := NewWebSocket(context.Background(), conn, w)
+	ws := wss.New(context.Background(), conn, w)
 	err := ws.Init()
 	if err != nil {
 		t.Fatalf("TestPublisher_Run")
 	}
 
-	GetNotifyCenter().Start()
+	Center().Start()
 
 	go func() {
-		DoWebSocketListAndWatch(context.Background(), "", nil, conn)
+		wss.ListAndWatch(context.Background(), "", nil, conn)
 
 		w2 := NewInstanceEventListWatcher("g", "s", func() (results []*registry.WatchInstanceResponse, rev int64) {
 			return
 		})
-		ws2 := NewWebSocket(context.Background(), conn, w2)
+		ws2 := wss.New(context.Background(), conn, w2)
 		err := ws2.Init()
 		if err != nil {
 			t.Fatalf("TestPublisher_Run")
 		}
 	}()
 
-	go ws.HandleWatchWebSocketControlMessage()
+	go ws.HandleControlMessage()
 
 	w.OnMessage(nil)
 	w.OnMessage(&InstanceEvent{})
 
-	GetNotifyCenter().Publish(NewInstanceEvent("g", "s", 1, &registry.WatchInstanceResponse{
+	Center().Publish(NewInstanceEvent("g", "s", 1, &registry.WatchInstanceResponse{
 		Response: registry.CreateResponse(registry.ResponseSuccess, "ok"),
 		Action:   string(registry.EVT_CREATE),
 		Key:      &registry.MicroServiceKey{},
@@ -111,12 +112,12 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
 	<-time.After(time.Second)
 
-	ws.HandleWatchWebSocketJob(nil)
+	ws.HandleEvent(nil)
 
 	ws.Heartbeat(websocket.PingMessage)
 	ws.Heartbeat(websocket.PongMessage)
 
-	ws.HandleWatchWebSocketJob(time.Now())
+	ws.HandleEvent(time.Now())
 
 	closeCh <- struct{}{}
 
@@ -127,5 +128,5 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
 	w.OnMessage(nil)
 
-	Instance().Stop()
+	wss.Instance().Stop()
 }
diff --git a/server/health/health_test.go b/server/health/health_test.go
index afd5786..91e1c60 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -23,7 +23,7 @@ import (
 )
 
 func TestDefaultHealthChecker_Healthy(t *testing.T) {
-	notify.GetNotifyCenter().Start()
+	notify.Center().Start()
 
 	// normal case
 	var hc DefaultHealthChecker
diff --git a/server/notify/center.go b/server/notify/center.go
index 4a0ad80..35fa0e5 100644
--- a/server/notify/center.go
+++ b/server/notify/center.go
@@ -19,13 +19,12 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/notify"
 )
 
-var INSTANCE = notify.RegisterType("INSTANCE", InstanceEventQueueSize)
 var notifyService *notify.Service
 
 func init() {
 	notifyService = notify.NewNotifyService()
 }
 
-func GetNotifyCenter() *notify.Service {
+func Center() *notify.Service {
 	return notifyService
 }
diff --git a/server/notify/common.go b/server/notify/common.go
deleted file mode 100644
index d0db027..0000000
--- a/server/notify/common.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package notify
-
-import "time"
-
-const (
-	AddJobTimeout          = 1 * time.Second
-	HeartbeatInterval      = 30 * time.Second
-	ReadTimeout            = HeartbeatInterval * 4
-	SendTimeout            = 5 * time.Second
-	InstanceEventQueueSize = 5000
-	ReadMaxBody            = 64
-	Websocket              = "Websocket"
-	GRPC                   = "gRPC"
-)
diff --git a/server/notify/listwatcher.go b/server/notify/instance_subscriber.go
similarity index 96%
rename from server/notify/listwatcher.go
rename to server/notify/instance_subscriber.go
index 6de175f..4897e49 100644
--- a/server/notify/listwatcher.go
+++ b/server/notify/instance_subscriber.go
@@ -27,6 +27,13 @@ import (
 	"time"
 )
 
+const (
+	AddJobTimeout  = 1 * time.Second
+	EventQueueSize = 5000
+)
+
+var INSTANCE = notify.RegisterType("INSTANCE", EventQueueSize)
+
 // 状态变化推送
 type InstanceEvent struct {
 	notify.Event
@@ -45,7 +52,7 @@ type InstanceEventListWatcher struct {
 func (w *InstanceEventListWatcher) SetError(err error) {
 	w.Subscriber.SetError(err)
 	// 触发清理job
-	e := w.Service().Publish(notify.NewNotifyServiceHealthCheckJob(w))
+	e := w.Service().Publish(notify.NewErrEvent(w))
 	if e != nil {
 		log.Error("", e)
 	}
diff --git a/server/server.go b/server/server.go
index 7f2ac4c..8f13147 100644
--- a/server/server.go
+++ b/server/server.go
@@ -92,7 +92,7 @@ func (s *ServiceCenterServer) initialize() {
 	// Datasource
 	s.initDatasource()
 	s.apiService = GetAPIServer()
-	s.notifyService = notify.GetNotifyCenter()
+	s.notifyService = notify.Center()
 }
 
 func (s *ServiceCenterServer) initEndpoints() {
diff --git a/server/service/watch.go b/server/service/watch.go
index 77a1a63..3f6341a 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -25,7 +25,8 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/proto"
 	pb "github.com/apache/servicecomb-service-center/pkg/registry"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/notify"
+	"github.com/apache/servicecomb-service-center/server/connection/grpc"
+	"github.com/apache/servicecomb-service-center/server/connection/ws"
 	"github.com/gorilla/websocket"
 )
 
@@ -47,25 +48,25 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream proto.Servic
 		return err
 	}
 
-	return notify.DoStreamListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
+	return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
 	log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
 	if err := s.WatchPreOpera(ctx, in); err != nil {
-		notify.EstablishWebSocketError(conn, err)
+		ws.SendEstablishError(conn, err)
 		return
 	}
-	notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
+	ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
 	log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId)
 	if err := s.WatchPreOpera(ctx, in); err != nil {
-		notify.EstablishWebSocketError(conn, err)
+		ws.SendEstablishError(conn, err)
 		return
 	}
-	notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
+	ws.ListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
 		return serviceUtil.QueryAllProvidersInstances(ctx, in.SelfServiceId)
 	}, conn)
 }