You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/18 07:11:38 UTC

[servicecomb-service-center] branch master updated: SCB-2176 Refactor event bus (#977)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new e9b3f1a  SCB-2176 Refactor event bus (#977)
e9b3f1a is described below

commit e9b3f1a32b9ebc8ccd10f07f142615569c331111
Author: little-cui <su...@qq.com>
AuthorDate: Tue May 18 15:11:32 2021 +0800

    SCB-2176 Refactor event bus (#977)
    
    * SCB-2176 Refactor event bus
    
    * SCB-2176 Fix: UT failures
---
 datasource/etcd/event/instance_event_handler.go    |  8 +--
 datasource/etcd/event/rule_event_handler.go        |  4 +-
 datasource/etcd/event/tag_event_handler.go         |  4 +-
 datasource/etcd/sd/k8s/adaptor/listwatcher.go      |  6 +-
 datasource/mongo/event/instance_event_handler.go   |  6 +-
 pkg/{notify/processor.go => event/bus.go}          | 58 ++++++++++---------
 .../bus_service.go}                                | 65 +++++++++++----------
 .../bus_service_test.go}                           | 12 ++--
 .../processor_test.go => event/bus_test.go}        | 12 ++--
 pkg/{notify => event}/common.go                    |  6 +-
 pkg/{notify/notice.go => event/event.go}           |  2 +-
 pkg/{notify/notice_test.go => event/event_test.go} |  7 +--
 pkg/{notify/subject.go => event/poster.go}         | 41 +++++++------
 .../subject_test.go => event/poster_test.go}       | 16 +++---
 pkg/{notify => event}/subscriber.go                | 28 ++++-----
 pkg/event/subscriber_checker.go                    | 63 ++++++++++++++++++++
 pkg/{notify/group.go => event/subscriber_group.go} | 38 ++++++------
 .../subscriber_group_test.go}                      | 25 ++++----
 pkg/{notify => event}/types.go                     | 14 +++--
 pkg/{notify => event}/types_test.go                |  9 +--
 pkg/notify/subscriber_checker.go                   | 67 ----------------------
 pkg/queue/taskqueue.go                             |  6 +-
 pkg/queue/taskqueue_test.go                        | 10 ++--
 pkg/notify/common.go => server/alarm/center.go     | 11 ++--
 server/alarm/common.go                             |  5 +-
 server/alarm/model/types.go                        |  2 +-
 server/alarm/service.go                            | 17 ++----
 server/connection/grpc/stream.go                   |  8 +--
 server/connection/grpc/stream_test.go              |  6 +-
 server/connection/ws/websocket.go                  | 14 ++---
 server/connection/ws/websocket_test.go             | 22 ++++---
 server/{notify => event}/center.go                 | 15 +++--
 server/{notify => event}/instance_subscriber.go    | 24 ++++----
 server/health/health_test.go                       |  7 +--
 server/metrics/connection.go                       |  4 +-
 server/server.go                                   | 14 ++---
 36 files changed, 331 insertions(+), 325 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index d5058e9..7297b7c 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -33,8 +33,8 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/core"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	"github.com/apache/servicecomb-service-center/server/notify"
 	"github.com/apache/servicecomb-service-center/server/syncernotify"
 )
 
@@ -104,7 +104,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
 		NotifySyncerInstanceEvent(evt, domainProject, ms)
 	}
 
-	if notify.Center().Closed() {
+	if event.Center().Closed() {
 		log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
 			action, providerID, providerInstanceID, instance.Endpoints))
 		return
@@ -147,8 +147,8 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
 	}
 	for _, consumerID := range subscribers {
 		// TODO add超时怎么处理?
-		evt := notify.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
-		err := notify.Center().Publish(evt)
+		evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
+		err := event.Center().Fire(evt)
 		if err != nil {
 			log.Errorf(err, "publish event[%v] into channel failed", evt)
 		}
diff --git a/datasource/etcd/event/rule_event_handler.go b/datasource/etcd/event/rule_event_handler.go
index 1c59eef..9955f23 100644
--- a/datasource/etcd/event/rule_event_handler.go
+++ b/datasource/etcd/event/rule_event_handler.go
@@ -32,7 +32,7 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/task"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/notify"
+	"github.com/apache/servicecomb-service-center/server/event"
 )
 
 type RulesChangedTask struct {
@@ -100,7 +100,7 @@ func (h *RuleEventHandler) OnEvent(evt sd.KvEvent) {
 	}
 
 	providerID, ruleID, domainProject := path.GetInfoFromRuleKV(evt.KV.Key)
-	if notify.Center().Closed() {
+	if event.Center().Closed() {
 		log.Warnf("caught [%s] service rule[%s/%s] event, but notify service is closed",
 			action, providerID, ruleID)
 		return
diff --git a/datasource/etcd/event/tag_event_handler.go b/datasource/etcd/event/tag_event_handler.go
index 46fadeb..17a2d31 100644
--- a/datasource/etcd/event/tag_event_handler.go
+++ b/datasource/etcd/event/tag_event_handler.go
@@ -33,7 +33,7 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/task"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/notify"
+	"github.com/apache/servicecomb-service-center/server/event"
 )
 
 type TagsChangedTask struct {
@@ -114,7 +114,7 @@ func (h *TagEventHandler) OnEvent(evt sd.KvEvent) {
 
 	consumerID, domainProject := path.GetInfoFromTagKV(evt.KV.Key)
 
-	if notify.Center().Closed() {
+	if event.Center().Closed() {
 		log.Warnf("caught [%s] service tags[%s/%s] event, but notify service is closed",
 			action, consumerID, evt.KV.Value)
 		return
diff --git a/datasource/etcd/sd/k8s/adaptor/listwatcher.go b/datasource/etcd/sd/k8s/adaptor/listwatcher.go
index cf511bb..87af286 100644
--- a/datasource/etcd/sd/k8s/adaptor/listwatcher.go
+++ b/datasource/etcd/sd/k8s/adaptor/listwatcher.go
@@ -60,16 +60,16 @@ func NewListWatcher(t K8sType, lister cache.SharedIndexInformer, f OnEventFunc)
 	lw.AddEventHandler(
 		cache.ResourceEventHandlerFuncs{
 			AddFunc: func(obj interface{}) {
-				Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
+				Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
 			},
 			UpdateFunc: func(old, new interface{}) {
 				if !reflect.DeepEqual(old, new) {
-					Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
+					Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
 						PrevObject: old}})
 				}
 			},
 			DeleteFunc: func(obj interface{}) {
-				Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
+				Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
 			},
 		})
 	Queue(t).AddWorker(lw)
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index 9d85aba..a32c5ce 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -34,8 +34,8 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	"github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	"github.com/apache/servicecomb-service-center/server/notify"
 	"github.com/apache/servicecomb-service-center/server/syncernotify"
 )
 
@@ -109,8 +109,8 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
 		Instance: evt.Value.(model.Instance).Instance,
 	}
 	for _, consumerID := range subscribers {
-		evt := notify.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
-		err := notify.Center().Publish(evt)
+		evt := event.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
+		err := event.Center().Fire(evt)
 		if err != nil {
 			log.Error(fmt.Sprintf("publish event[%v] into channel failed", evt), err)
 		}
diff --git a/pkg/notify/processor.go b/pkg/event/bus.go
similarity index 54%
rename from pkg/notify/processor.go
rename to pkg/event/bus.go
index 1bee22d..a573381 100644
--- a/pkg/notify/processor.go
+++ b/pkg/event/bus.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"context"
@@ -24,74 +24,76 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-type Processor struct {
+// Bus can fire the event aync and dispatch events to subscriber according to subject
+type Bus struct {
 	*queue.TaskQueue
 
 	name     string
 	subjects *util.ConcurrentMap
 }
 
-func (p *Processor) Name() string {
-	return p.name
+func (bus *Bus) Name() string {
+	return bus.name
 }
 
-func (p *Processor) Accept(evt Event) {
-	p.Add(queue.Task{Object: evt})
+func (bus *Bus) Fire(evt Event) {
+	// TODO add option if queue is full
+	bus.Add(queue.Task{Payload: evt})
 }
 
-func (p *Processor) Handle(ctx context.Context, obj interface{}) {
-	p.Notify(obj.(Event))
+func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
+	bus.fireAtOnce(evt.(Event))
 }
 
-func (p *Processor) Notify(evt Event) {
-	if itf, ok := p.subjects.Get(evt.Subject()); ok {
-		itf.(*Subject).Notify(evt)
+func (bus *Bus) fireAtOnce(evt Event) {
+	if itf, ok := bus.subjects.Get(evt.Subject()); ok {
+		itf.(*Poster).Post(evt)
 	}
 }
 
-func (p *Processor) Subjects(name string) *Subject {
-	itf, ok := p.subjects.Get(name)
+func (bus *Bus) Subjects(name string) *Poster {
+	itf, ok := bus.subjects.Get(name)
 	if !ok {
 		return nil
 	}
-	return itf.(*Subject)
+	return itf.(*Poster)
 }
 
-func (p *Processor) AddSubscriber(n Subscriber) {
-	item, _ := p.subjects.Fetch(n.Subject(), func() (interface{}, error) {
-		return NewSubject(n.Subject()), nil
+func (bus *Bus) AddSubscriber(n Subscriber) {
+	item, _ := bus.subjects.Fetch(n.Subject(), func() (interface{}, error) {
+		return NewPoster(n.Subject()), nil
 	})
-	item.(*Subject).GetOrNewGroup(n.Group()).AddSubscriber(n)
+	item.(*Poster).GetOrNewGroup(n.Group()).AddMember(n)
 }
 
-func (p *Processor) Remove(n Subscriber) {
-	itf, ok := p.subjects.Get(n.Subject())
+func (bus *Bus) RemoveSubscriber(n Subscriber) {
+	itf, ok := bus.subjects.Get(n.Subject())
 	if !ok {
 		return
 	}
 
-	s := itf.(*Subject)
+	s := itf.(*Poster)
 	g := s.Groups(n.Group())
 	if g == nil {
 		return
 	}
 
-	g.Remove(n.ID())
+	g.RemoveMember(n.ID())
 
 	if g.Size() == 0 {
-		s.Remove(g.Name())
+		s.RemoveGroup(g.Name())
 	}
 	if s.Size() == 0 {
-		p.subjects.Remove(s.Name())
+		bus.subjects.Remove(s.Subject())
 	}
 }
 
-func (p *Processor) Clear() {
-	p.subjects.Clear()
+func (bus *Bus) Clear() {
+	bus.subjects.Clear()
 }
 
-func NewProcessor(name string, queueSize int) *Processor {
-	p := &Processor{
+func NewBus(name string, queueSize int) *Bus {
+	p := &Bus{
 		TaskQueue: queue.NewTaskQueue(queueSize),
 		name:      name,
 		subjects:  util.NewConcurrentMap(0),
diff --git a/pkg/notify/notification_service.go b/pkg/event/bus_service.go
similarity index 66%
rename from pkg/notify/notification_service.go
rename to pkg/event/bus_service.go
index d63cace..2b53597 100644
--- a/pkg/notify/notification_service.go
+++ b/pkg/event/bus_service.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"errors"
@@ -25,15 +25,18 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-type Service struct {
-	processors map[Type]*Processor
-	mux        sync.RWMutex
-	isClose    bool
+// BusService is the daemon service to manage multiple type Bus
+// and wrap handle methods of Bus
+type BusService struct {
+	// buses is the map of event handler, key is event source type
+	buses   map[Type]*Bus
+	mux     sync.RWMutex
+	isClose bool
 }
 
-func (s *Service) newProcessor(t Type) *Processor {
+func (s *BusService) newBus(t Type) *Bus {
 	s.mux.RLock()
-	p, ok := s.processors[t]
+	p, ok := s.buses[t]
 	if ok {
 		s.mux.RUnlock()
 		return p
@@ -41,20 +44,20 @@ func (s *Service) newProcessor(t Type) *Processor {
 	s.mux.RUnlock()
 
 	s.mux.Lock()
-	p, ok = s.processors[t]
+	p, ok = s.buses[t]
 	if ok {
 		s.mux.Unlock()
 		return p
 	}
-	p = NewProcessor(t.String(), t.QueueSize())
-	s.processors[t] = p
+	p = NewBus(t.String(), t.QueueSize())
+	s.buses[t] = p
 	s.mux.Unlock()
 
 	p.Run()
 	return p
 }
 
-func (s *Service) Start() {
+func (s *BusService) Start() {
 	if !s.Closed() {
 		log.Warnf("notify service is already running")
 		return
@@ -64,7 +67,7 @@ func (s *Service) Start() {
 	s.mux.Unlock()
 
 	// 错误subscriber清理
-	err := s.AddSubscriber(NewSubscriberChecker())
+	err := s.AddSubscriber(NewSubscriberHealthChecker())
 	if err != nil {
 		log.Error("", err)
 	}
@@ -72,7 +75,7 @@ func (s *Service) Start() {
 	log.Debugf("notify service is started")
 }
 
-func (s *Service) AddSubscriber(n Subscriber) error {
+func (s *BusService) AddSubscriber(n Subscriber) error {
 	if n == nil {
 		err := errors.New("required Subscriber")
 		log.Errorf(err, "add subscriber failed")
@@ -85,30 +88,30 @@ func (s *Service) AddSubscriber(n Subscriber) error {
 		return err
 	}
 
-	p := s.newProcessor(n.Type())
-	n.SetService(s)
+	p := s.newBus(n.Type())
+	n.SetBus(s)
 	n.OnAccept()
 
 	p.AddSubscriber(n)
 	return nil
 }
 
-func (s *Service) RemoveSubscriber(n Subscriber) {
+func (s *BusService) RemoveSubscriber(n Subscriber) {
 	s.mux.RLock()
-	p, ok := s.processors[n.Type()]
+	p, ok := s.buses[n.Type()]
 	if !ok {
 		s.mux.RUnlock()
 		return
 	}
 	s.mux.RUnlock()
 
-	p.Remove(n)
+	p.RemoveSubscriber(n)
 	n.Close()
 }
 
-func (s *Service) stopProcessors() {
+func (s *BusService) closeBuses() {
 	s.mux.RLock()
-	for _, p := range s.processors {
+	for _, p := range s.buses {
 		p.Clear()
 		p.Stop()
 	}
@@ -116,30 +119,30 @@ func (s *Service) stopProcessors() {
 }
 
 //通知内容塞到队列里
-func (s *Service) Publish(evt Event) error {
+func (s *BusService) Fire(evt Event) error {
 	if s.Closed() {
-		return errors.New("add notify event failed for server shutdown")
+		return errors.New("add notify evt failed for server shutdown")
 	}
 
 	s.mux.RLock()
-	p, ok := s.processors[evt.Type()]
+	bus, ok := s.buses[evt.Type()]
 	if !ok {
 		s.mux.RUnlock()
 		return fmt.Errorf("unknown event type[%s]", evt.Type())
 	}
 	s.mux.RUnlock()
-	p.Accept(evt)
+	bus.Fire(evt)
 	return nil
 }
 
-func (s *Service) Closed() (b bool) {
+func (s *BusService) Closed() (b bool) {
 	s.mux.RLock()
 	b = s.isClose
 	s.mux.RUnlock()
 	return
 }
 
-func (s *Service) Stop() {
+func (s *BusService) Stop() {
 	if s.Closed() {
 		return
 	}
@@ -147,14 +150,14 @@ func (s *Service) Stop() {
 	s.isClose = true
 	s.mux.Unlock()
 
-	s.stopProcessors()
+	s.closeBuses()
 
 	log.Debug("notify service stopped")
 }
 
-func NewNotifyService() *Service {
-	return &Service{
-		processors: make(map[Type]*Processor),
-		isClose:    true,
+func NewBusService() *BusService {
+	return &BusService{
+		buses:   make(map[Type]*Bus),
+		isClose: true,
 	}
 }
diff --git a/pkg/notify/notification_test.go b/pkg/event/bus_service_test.go
similarity index 90%
rename from pkg/notify/notification_test.go
rename to pkg/event/bus_service_test.go
index ba23613..c6ae242 100644
--- a/pkg/notify/notification_test.go
+++ b/pkg/event/bus_service_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"testing"
@@ -27,7 +27,7 @@ import (
 func TestGetNotifyService(t *testing.T) {
 	INSTANCE := RegisterType("INSTANCE", 1)
 
-	notifyService := NewNotifyService()
+	notifyService := NewBusService()
 	if notifyService == nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
@@ -39,7 +39,7 @@ func TestGetNotifyService(t *testing.T) {
 	if err == nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
-	err = notifyService.Publish(nil)
+	err = notifyService.Fire(nil)
 	if err == nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
@@ -63,15 +63,15 @@ func TestGetNotifyService(t *testing.T) {
 		t.Fatalf("TestGetNotifyService failed, %v", err)
 	}
 	j := &baseEvent{INSTANCE, "s", "g", simple.FromTime(time.Now())}
-	err = notifyService.Publish(j)
+	err = notifyService.Fire(j)
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
-	err = notifyService.Publish(NewErrEvent(NewSubscriberChecker()))
+	err = notifyService.Fire(NewUnhealthyEvent(NewSubscriberHealthChecker()))
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
-	err = notifyService.Publish(NewErrEvent(s))
+	err = notifyService.Fire(NewUnhealthyEvent(s))
 	if err != nil {
 		t.Fatalf("TestGetNotifyService failed")
 	}
diff --git a/pkg/notify/processor_test.go b/pkg/event/bus_test.go
similarity index 91%
rename from pkg/notify/processor_test.go
rename to pkg/event/bus_test.go
index 894231b..a5e214b 100644
--- a/pkg/notify/processor_test.go
+++ b/pkg/event/bus_test.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
 import (
 	"testing"
@@ -37,7 +37,7 @@ func TestProcessor_Do(t *testing.T) {
 		job: make(chan Event, 1)}
 	mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", "g2"),
 		job: make(chan Event, 1)}
-	p := NewProcessor("p1", 0)
+	p := NewBus("p1", 0)
 	if p.Name() != "p1" {
 		t.Fatalf("TestProcessor_Do")
 	}
@@ -45,12 +45,12 @@ func TestProcessor_Do(t *testing.T) {
 		t.Fatalf("TestProcessor_Do")
 	}
 	p.AddSubscriber(mock1)
-	if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Subscribers(mock1.ID()) != mock1 {
+	if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Member(mock1.ID()) != mock1 {
 		t.Fatalf("TestProcessor_Do")
 	}
-	p.Remove(NewSubscriber(INSTANCE, "s2", "g1"))
-	p.Remove(NewSubscriber(INSTANCE, "s1", "g2"))
-	p.Remove(mock1)
+	p.RemoveSubscriber(NewSubscriber(INSTANCE, "s2", "g1"))
+	p.RemoveSubscriber(NewSubscriber(INSTANCE, "s1", "g2"))
+	p.RemoveSubscriber(mock1)
 	if p.Subjects(mock1.Subject()) != nil {
 		t.Fatalf("TestProcessor_Do")
 	}
diff --git a/pkg/notify/common.go b/pkg/event/common.go
similarity index 95%
copy from pkg/notify/common.go
copy to pkg/event/common.go
index a6b5337..3b2f115 100644
--- a/pkg/notify/common.go
+++ b/pkg/event/common.go
@@ -15,8 +15,12 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 const (
 	DefaultQueueSize = 1000
 )
+
+const (
+	INNER Type = iota
+)
diff --git a/pkg/notify/notice.go b/pkg/event/event.go
similarity index 99%
rename from pkg/notify/notice.go
rename to pkg/event/event.go
index d20ec7f..d057a80 100644
--- a/pkg/notify/notice.go
+++ b/pkg/event/event.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"time"
diff --git a/pkg/notify/notice_test.go b/pkg/event/event_test.go
similarity index 87%
rename from pkg/notify/notice_test.go
rename to pkg/event/event_test.go
index 3eba149..ed181fb 100644
--- a/pkg/notify/notice_test.go
+++ b/pkg/event/event_test.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import (
 	"fmt"
@@ -21,14 +21,13 @@ import (
 )
 
 func TestNewEventWithTime(t *testing.T) {
-	regT := RegisterType("N", 1)
-	evt := NewEvent(regT, "a", "b")
+	evt := NewEvent(INNER, "a", "b")
 	if evt.CreateAt().UnixNano() == 0 {
 		t.Fatal("TestNewEventWithTime")
 	}
 	fmt.Println(evt.CreateAt())
 
-	if evt.Type() != regT || evt.Subject() != "a" || evt.Group() != "b" {
+	if evt.Type() != INNER || evt.Subject() != "a" || evt.Group() != "b" {
 		t.Fatal("TestNewEventWithTime")
 	}
 }
diff --git a/pkg/notify/subject.go b/pkg/event/poster.go
similarity index 66%
rename from pkg/notify/subject.go
rename to pkg/event/poster.go
index 4f79ba9..4cb3f49 100644
--- a/pkg/notify/subject.go
+++ b/pkg/event/poster.go
@@ -15,25 +15,32 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-type Subject struct {
-	name   string
-	groups *util.ConcurrentMap
+// Poster post the events of specified subject to the subscribers
+type Poster struct {
+	subject string
+	groups  *util.ConcurrentMap
 }
 
-func (s *Subject) Name() string {
-	return s.name
+func (s *Poster) Subject() string {
+	return s.subject
 }
 
-func (s *Subject) Notify(job Event) {
+func (s *Poster) Post(job Event) {
+	f := func(g *Group) {
+		g.ForEach(func(m Subscriber) {
+			m.OnMessage(job)
+		})
+	}
+
 	if len(job.Group()) == 0 {
 		s.groups.ForEach(func(item util.MapItem) (next bool) {
-			item.Value.(*Group).Notify(job)
+			f(item.Value.(*Group))
 			return true
 		})
 		return
@@ -43,10 +50,10 @@ func (s *Subject) Notify(job Event) {
 	if !ok {
 		return
 	}
-	itf.(*Group).Notify(job)
+	f(itf.(*Group))
 }
 
-func (s *Subject) Groups(name string) *Group {
+func (s *Poster) Groups(name string) *Group {
 	g, ok := s.groups.Get(name)
 	if !ok {
 		return nil
@@ -54,24 +61,24 @@ func (s *Subject) Groups(name string) *Group {
 	return g.(*Group)
 }
 
-func (s *Subject) GetOrNewGroup(name string) *Group {
+func (s *Poster) GetOrNewGroup(name string) *Group {
 	item, _ := s.groups.Fetch(name, func() (interface{}, error) {
 		return NewGroup(name), nil
 	})
 	return item.(*Group)
 }
 
-func (s *Subject) Remove(name string) {
+func (s *Poster) RemoveGroup(name string) {
 	s.groups.Remove(name)
 }
 
-func (s *Subject) Size() int {
+func (s *Poster) Size() int {
 	return s.groups.Size()
 }
 
-func NewSubject(name string) *Subject {
-	return &Subject{
-		name:   name,
-		groups: util.NewConcurrentMap(0),
+func NewPoster(subject string) *Poster {
+	return &Poster{
+		subject: subject,
+		groups:  util.NewConcurrentMap(0),
 	}
 }
diff --git a/pkg/notify/subject_test.go b/pkg/event/poster_test.go
similarity index 92%
rename from pkg/notify/subject_test.go
rename to pkg/event/poster_test.go
index c9e955c..931b5de 100644
--- a/pkg/notify/subject_test.go
+++ b/pkg/event/poster_test.go
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
 import "testing"
 
 func TestSubject_Fetch(t *testing.T) {
-	s := NewSubject("s1")
-	if s.Name() != "s1" {
+	s := NewPoster("s1")
+	if s.Subject() != "s1" {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
 	g := s.GetOrNewGroup("g1")
@@ -37,7 +37,7 @@ func TestSubject_Fetch(t *testing.T) {
 	if s.Size() != 2 {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
-	s.Remove(o.Name())
+	s.RemoveGroup(o.Name())
 	if s.Groups("g2") != nil {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
@@ -47,19 +47,19 @@ func TestSubject_Fetch(t *testing.T) {
 	INSTANCE := RegisterType("INSTANCE", 1)
 	mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
 	mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g2")}
-	g.AddSubscriber(mock1)
+	g.AddMember(mock1)
 	job := &baseEvent{group: "g3"}
-	s.Notify(job)
+	s.Post(job)
 	if mock1.job != nil || mock2.job != nil {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
 	job.group = "g1"
-	s.Notify(job)
+	s.Post(job)
 	if mock1.job != job || mock2.job != nil {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
 	job.group = ""
-	s.Notify(job)
+	s.Post(job)
 	if mock1.job != job && mock2.job != job {
 		t.Fatalf("TestSubject_Fetch failed")
 	}
diff --git a/pkg/notify/subscriber.go b/pkg/event/subscriber.go
similarity index 67%
rename from pkg/notify/subscriber.go
rename to pkg/event/subscriber.go
index ccacb4a..b26060b 100644
--- a/pkg/notify/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"errors"
@@ -28,8 +28,8 @@ type Subscriber interface {
 	Subject() string
 	Group() string
 	Type() Type
-	Service() *Service
-	SetService(*Service)
+	Bus() *BusService
+	SetBus(*BusService)
 
 	Err() error
 	SetError(err error)
@@ -45,20 +45,20 @@ type baseSubscriber struct {
 	id      string
 	subject string
 	group   string
-	service *Service
+	service *BusService
 	err     error
 }
 
-func (s *baseSubscriber) ID() string              { return s.id }
-func (s *baseSubscriber) Subject() string         { return s.subject }
-func (s *baseSubscriber) Group() string           { return s.group }
-func (s *baseSubscriber) Type() Type              { return s.nType }
-func (s *baseSubscriber) Service() *Service       { return s.service }
-func (s *baseSubscriber) SetService(svc *Service) { s.service = svc }
-func (s *baseSubscriber) Err() error              { return s.err }
-func (s *baseSubscriber) SetError(err error)      { s.err = err }
-func (s *baseSubscriber) Close()                  {}
-func (s *baseSubscriber) OnAccept()               {}
+func (s *baseSubscriber) ID() string             { return s.id }
+func (s *baseSubscriber) Subject() string        { return s.subject }
+func (s *baseSubscriber) Group() string          { return s.group }
+func (s *baseSubscriber) Type() Type             { return s.nType }
+func (s *baseSubscriber) Bus() *BusService       { return s.service }
+func (s *baseSubscriber) SetBus(svc *BusService) { s.service = svc }
+func (s *baseSubscriber) Err() error             { return s.err }
+func (s *baseSubscriber) SetError(err error)     { s.err = err }
+func (s *baseSubscriber) Close()                 {}
+func (s *baseSubscriber) OnAccept()              {}
 func (s *baseSubscriber) OnMessage(job Event) {
 	s.SetError(errors.New("do not call base notifier OnMessage method"))
 }
diff --git a/pkg/event/subscriber_checker.go b/pkg/event/subscriber_checker.go
new file mode 100644
index 0000000..28c4cb6
--- /dev/null
+++ b/pkg/event/subscriber_checker.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import "github.com/apache/servicecomb-service-center/pkg/log"
+
+const (
+	CheckerGroup   = "__HealthChecker__"
+	CheckerSubject = "__NotifyServerHealthCheck__"
+)
+
+// SubscriberHealthChecker check subscriber health and remove it from bus if return Err
+type SubscriberHealthChecker struct {
+	Subscriber
+}
+
+type UnhealthyEvent struct {
+	Event
+	ErrorSubscriber Subscriber
+}
+
+func (s *SubscriberHealthChecker) OnMessage(evt Event) {
+	j := evt.(*UnhealthyEvent)
+	err := j.ErrorSubscriber.Err()
+
+	if j.ErrorSubscriber.Type() == INNER {
+		log.Errorf(nil, "remove %s watcher failed, here cause a dead lock, subject: %s, group: %s",
+			j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
+		return
+	}
+
+	log.Debugf("notification service remove %s watcher, error: %v, subject: %s, group: %s",
+		j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
+	s.Bus().RemoveSubscriber(j.ErrorSubscriber)
+}
+
+func NewSubscriberHealthChecker() *SubscriberHealthChecker {
+	return &SubscriberHealthChecker{
+		Subscriber: NewSubscriber(INNER, CheckerSubject, CheckerGroup),
+	}
+}
+
+func NewUnhealthyEvent(s Subscriber) *UnhealthyEvent {
+	return &UnhealthyEvent{
+		Event:           NewEvent(INNER, CheckerSubject, CheckerGroup),
+		ErrorSubscriber: s,
+	}
+}
diff --git a/pkg/notify/group.go b/pkg/event/subscriber_group.go
similarity index 65%
rename from pkg/notify/group.go
rename to pkg/event/subscriber_group.go
index c600bba..0c41f4a 100644
--- a/pkg/notify/group.go
+++ b/pkg/event/subscriber_group.go
@@ -15,51 +15,51 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 type Group struct {
-	name        string
-	subscribers *util.ConcurrentMap
+	name    string
+	members *util.ConcurrentMap
 }
 
 func (g *Group) Name() string {
 	return g.name
 }
 
-func (g *Group) Notify(job Event) {
-	g.subscribers.ForEach(func(item util.MapItem) (next bool) {
-		item.Value.(Subscriber).OnMessage(job)
-		return true
-	})
-}
-
-func (g *Group) Subscribers(name string) Subscriber {
-	s, ok := g.subscribers.Get(name)
+func (g *Group) Member(name string) Subscriber {
+	s, ok := g.members.Get(name)
 	if !ok {
 		return nil
 	}
 	return s.(Subscriber)
 }
 
-func (g *Group) AddSubscriber(subscriber Subscriber) Subscriber {
-	return g.subscribers.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
+func (g *Group) ForEach(iter func(m Subscriber)) {
+	g.members.ForEach(func(item util.MapItem) (next bool) {
+		iter(item.Value.(Subscriber))
+		return true
+	})
+}
+
+func (g *Group) AddMember(subscriber Subscriber) Subscriber {
+	return g.members.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
 }
 
-func (g *Group) Remove(name string) {
-	g.subscribers.Remove(name)
+func (g *Group) RemoveMember(name string) {
+	g.members.Remove(name)
 }
 
 func (g *Group) Size() int {
-	return g.subscribers.Size()
+	return g.members.Size()
 }
 
 func NewGroup(name string) *Group {
 	return &Group{
-		name:        name,
-		subscribers: util.NewConcurrentMap(0),
+		name:    name,
+		members: util.NewConcurrentMap(0),
 	}
 }
diff --git a/pkg/notify/group_test.go b/pkg/event/subscriber_group_test.go
similarity index 79%
rename from pkg/notify/group_test.go
rename to pkg/event/subscriber_group_test.go
index 81040a7..b0dc22d 100644
--- a/pkg/notify/group_test.go
+++ b/pkg/event/subscriber_group_test.go
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
-import "testing"
+import (
+	"testing"
+)
 
 type mockSubscriber struct {
 	Subscriber
@@ -34,37 +36,32 @@ func TestGroup_Add(t *testing.T) {
 	if g.Name() != "g1" {
 		t.Fatalf("TestGroup_Add failed")
 	}
-	if g.AddSubscriber(m) != m {
+	if g.AddMember(m) != m {
 		t.Fatalf("TestGroup_Add failed")
 	}
-	if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m {
+	if g.AddMember(NewSubscriber(INSTANCE, "s1", "g1")) == m {
 		t.Fatalf("TestGroup_Add failed")
 	}
 	same := *(m.(*baseSubscriber))
-	if g.AddSubscriber(&same) != m {
+	if g.AddMember(&same) != m {
 		t.Fatalf("TestGroup_Add failed")
 	}
 	if g.Size() != 2 {
 		t.Fatalf("TestGroup_Add failed")
 	}
-	g.Remove(m.ID())
+	g.RemoveMember(m.ID())
 	if g.Size() != 1 {
 		t.Fatalf("TestGroup_Add failed")
 	}
-	if g.Subscribers(m.ID()) == m {
+	if g.Member(m.ID()) == m {
 		t.Fatalf("TestGroup_Add failed")
 	}
 
 	mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
-	if g.AddSubscriber(mock) != mock {
+	if g.AddMember(mock) != mock {
 		t.Fatalf("TestGroup_Add failed")
 	}
-	if g.Subscribers(mock.ID()) != mock {
-		t.Fatalf("TestGroup_Add failed")
-	}
-	job := &baseEvent{nType: INSTANCE}
-	g.Notify(job)
-	if mock.job != job {
+	if g.Member(mock.ID()) != mock {
 		t.Fatalf("TestGroup_Add failed")
 	}
 }
diff --git a/pkg/notify/types.go b/pkg/event/types.go
similarity index 92%
rename from pkg/notify/types.go
rename to pkg/event/types.go
index 3cf125c..d42377b 100644
--- a/pkg/notify/types.go
+++ b/pkg/event/types.go
@@ -13,11 +13,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
-import (
-	"strconv"
-)
+import "strconv"
 
 type Type int
 
@@ -42,9 +40,13 @@ func (nt Type) IsValid() bool {
 	return nt >= 0 && int(nt) < len(typeQueues)
 }
 
-var typeNames []string
+var typeNames = []string{
+	INNER: "INNER",
+}
 
-var typeQueues []int
+var typeQueues = []int{
+	INNER: 0,
+}
 
 func Types() (ts []Type) {
 	for i := range typeNames {
diff --git a/pkg/notify/types_test.go b/pkg/event/types_test.go
similarity index 88%
rename from pkg/notify/types_test.go
rename to pkg/event/types_test.go
index c209c31..35e5a66 100644
--- a/pkg/notify/types_test.go
+++ b/pkg/event/types_test.go
@@ -13,11 +13,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
-import (
-	"testing"
-)
+import "testing"
 
 func TestRegisterType(t *testing.T) {
 	id := RegisterType("a", 0)
@@ -32,4 +30,7 @@ func TestRegisterType(t *testing.T) {
 	if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
 		t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
 	}
+	if INNER.String() != "INNER" || INNER.QueueSize() != DefaultQueueSize {
+		t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+	}
 }
diff --git a/pkg/notify/subscriber_checker.go b/pkg/notify/subscriber_checker.go
deleted file mode 100644
index af73396..0000000
--- a/pkg/notify/subscriber_checker.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package notify
-
-import (
-	"github.com/apache/servicecomb-service-center/pkg/log"
-)
-
-const (
-	groupCheck   = "__HealthChecker__"
-	subjectCheck = "__SubscriberHealthCheck__"
-)
-
-var TypeCheck = RegisterType("CHECKER", DefaultQueueSize)
-
-//Notifier 健康检查
-type SubscriberChecker struct {
-	Subscriber
-}
-
-type ErrEvent struct {
-	Event
-	Subscriber Subscriber
-}
-
-func (s *SubscriberChecker) OnMessage(evt Event) {
-	j := evt.(*ErrEvent)
-	err := j.Subscriber.Err()
-
-	if j.Subscriber.Type() == TypeCheck {
-		log.Errorf(nil, "remove %s subscriber failed, here cause a dead lock, subject: %s, group: %s",
-			j.Subscriber.Type(), j.Subscriber.Subject(), j.Subscriber.Group())
-		return
-	}
-
-	log.Debugf("notification service remove %s subscriber, error: %v, subject: %s, group: %s",
-		j.Subscriber.Type(), err, j.Subscriber.Subject(), j.Subscriber.Group())
-	s.Service().RemoveSubscriber(j.Subscriber)
-}
-
-func NewSubscriberChecker() *SubscriberChecker {
-	return &SubscriberChecker{
-		Subscriber: NewSubscriber(TypeCheck, subjectCheck, groupCheck),
-	}
-}
-
-func NewErrEvent(by Subscriber) *ErrEvent {
-	return &ErrEvent{
-		Event:      NewEvent(TypeCheck, subjectCheck, groupCheck),
-		Subscriber: by,
-	}
-}
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 3eb65f6..1875ff2 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -30,7 +30,7 @@ type Worker interface {
 }
 
 type Task struct {
-	Object interface{}
+	Payload interface{}
 	// Async can let workers handle this task concurrently, but
 	// it will make this task unordered
 	Async bool
@@ -62,13 +62,13 @@ func (q *TaskQueue) Do(ctx context.Context, task Task) {
 	if task.Async {
 		for _, w := range q.Workers {
 			q.goroutine.Do(func(ctx context.Context) {
-				q.dispatch(ctx, w, task.Object)
+				q.dispatch(ctx, w, task.Payload)
 			})
 		}
 		return
 	}
 	for _, w := range q.Workers {
-		q.dispatch(ctx, w, task.Object)
+		q.dispatch(ctx, w, task.Payload)
 	}
 }
 
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 8c79da4..d0eecd6 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -34,26 +34,26 @@ func TestNewEventQueue(t *testing.T) {
 	q := NewTaskQueue(0)
 	q.AddWorker(h)
 
-	q.Do(context.Background(), Task{Object: 1})
+	q.Do(context.Background(), Task{Payload: 1})
 	if <-h.Object != 1 {
 		t.Fatalf("TestNewEventQueue failed")
 	}
 
-	q.Do(context.Background(), Task{Object: 11, Async: true})
+	q.Do(context.Background(), Task{Payload: 11, Async: true})
 	if <-h.Object != 11 {
 		t.Fatalf("TestNewEventQueue failed")
 	}
 
 	q.Run()
-	q.Add(Task{Object: 2})
+	q.Add(Task{Payload: 2})
 	if <-h.Object != 2 {
 		t.Fatalf("TestNewEventQueue failed")
 	}
 
-	q.Add(Task{Object: 22, Async: true})
+	q.Add(Task{Payload: 22, Async: true})
 	if <-h.Object != 22 {
 		t.Fatalf("TestNewEventQueue failed")
 	}
 	q.Stop()
-	q.Add(Task{Object: 3})
+	q.Add(Task{Payload: 3})
 }
diff --git a/pkg/notify/common.go b/server/alarm/center.go
similarity index 88%
rename from pkg/notify/common.go
rename to server/alarm/center.go
index a6b5337..9af6aa8 100644
--- a/pkg/notify/common.go
+++ b/server/alarm/center.go
@@ -15,8 +15,11 @@
  * limitations under the License.
  */
 
-package notify
+package alarm
 
-const (
-	DefaultQueueSize = 1000
-)
+func Center() *Service {
+	once.Do(func() {
+		service = NewAlarmService()
+	})
+	return service
+}
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 6d5eee3..cb9df7a 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,8 +17,7 @@ package alarm
 
 import (
 	"fmt"
-
-	"github.com/apache/servicecomb-service-center/pkg/notify"
+	"github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/server/alarm/model"
 )
 
@@ -43,7 +42,7 @@ const (
 	Group   = "__ALARM_GROUP__"
 )
 
-var ALARM = notify.RegisterType("ALARM", notify.DefaultQueueSize)
+var ALARM = event.RegisterType("ALARM", 0)
 
 func FieldBool(key string, v bool) model.Field {
 	return model.Field{Key: key, Value: v}
diff --git a/server/alarm/model/types.go b/server/alarm/model/types.go
index 0c29d79..40d0a44 100644
--- a/server/alarm/model/types.go
+++ b/server/alarm/model/types.go
@@ -16,7 +16,7 @@
 package model
 
 import (
-	nf "github.com/apache/servicecomb-service-center/pkg/notify"
+	nf "github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
diff --git a/server/alarm/service.go b/server/alarm/service.go
index 3aeeb3b..6b3cc1b 100644
--- a/server/alarm/service.go
+++ b/server/alarm/service.go
@@ -18,11 +18,11 @@ package alarm
 import (
 	"sync"
 
+	nf "github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	nf "github.com/apache/servicecomb-service-center/pkg/notify"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/alarm/model"
-	"github.com/apache/servicecomb-service-center/server/notify"
+	"github.com/apache/servicecomb-service-center/server/event"
 )
 
 var (
@@ -45,7 +45,7 @@ func (ac *Service) Raise(id model.ID, fields ...model.Field) error {
 	for _, f := range fields {
 		ae.Fields[f.Key] = f.Value
 	}
-	return notify.Center().Publish(ae)
+	return event.Center().Fire(ae)
 }
 
 func (ac *Service) Clear(id model.ID) error {
@@ -54,7 +54,7 @@ func (ac *Service) Clear(id model.ID) error {
 		Status: Cleared,
 		ID:     id,
 	}
-	return notify.Center().Publish(ae)
+	return event.Center().Fire(ae)
 }
 
 func (ac *Service) ListAll() (ls []*model.AlarmEvent) {
@@ -89,16 +89,9 @@ func NewAlarmService() *Service {
 	c := &Service{
 		Subscriber: nf.NewSubscriber(ALARM, Subject, Group),
 	}
-	err := notify.Center().AddSubscriber(c)
+	err := event.Center().AddSubscriber(c)
 	if err != nil {
 		log.Error("", err)
 	}
 	return c
 }
-
-func Center() *Service {
-	once.Do(func() {
-		service = NewAlarmService()
-	})
-	return service
-}
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index d8d0c1e..b74acc3 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -26,14 +26,14 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/proto"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	"github.com/apache/servicecomb-service-center/server/notify"
 	pb "github.com/go-chassis/cari/discovery"
 )
 
 const GRPC = "gRPC"
 
-func Handle(watcher *notify.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
 	timer := time.NewTimer(connection.HeartbeatInterval)
 	defer timer.Stop()
 	for {
@@ -72,8 +72,8 @@ func Handle(watcher *notify.InstanceEventListWatcher, stream proto.ServiceInstan
 func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	watcher := notify.NewInstanceEventListWatcher(serviceID, domainProject, f)
-	err = notify.Center().AddSubscriber(watcher)
+	watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, f)
+	err = event.Center().AddSubscriber(watcher)
 	if err != nil {
 		return
 	}
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 33e8ee5..3381167 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -24,7 +24,7 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	stream "github.com/apache/servicecomb-service-center/server/connection/grpc"
-	"github.com/apache/servicecomb-service-center/server/notify"
+	"github.com/apache/servicecomb-service-center/server/event"
 	pb "github.com/go-chassis/cari/discovery"
 	"google.golang.org/grpc"
 )
@@ -42,13 +42,13 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-	w := notify.NewInstanceEventListWatcher("g", "s", nil)
+	w := event.NewInstanceEventListWatcher("g", "s", nil)
 	w.Job <- nil
 	err := stream.Handle(w, &grpcWatchServer{})
 	if err == nil {
 		t.Fatalf("TestHandleWatchJob failed")
 	}
-	w.Job <- notify.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
+	w.Job <- event.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
 	w.Job <- nil
 	stream.Handle(w, &grpcWatchServer{})
 }
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index fc3c386..57b064c 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -27,8 +27,8 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/connection"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	"github.com/apache/servicecomb-service-center/server/notify"
 	pb "github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 )
@@ -40,7 +40,7 @@ type WebSocket struct {
 	ticker *time.Ticker
 	conn   *websocket.Conn
 	// watcher subscribe the notification service event
-	watcher         *notify.InstanceEventListWatcher
+	watcher         *event.InstanceEventListWatcher
 	needPingWatcher bool
 	free            chan struct{}
 	closed          chan struct{}
@@ -57,7 +57,7 @@ func (wh *WebSocket) Init() error {
 	remoteAddr := wh.conn.RemoteAddr().String()
 
 	// put in notification service queue
-	if err := notify.Center().AddSubscriber(wh.watcher); err != nil {
+	if err := event.Center().AddSubscriber(wh.watcher); err != nil {
 		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
 			remoteAddr, err.Error())
 		log.Errorf(nil, err.Error())
@@ -228,7 +228,7 @@ func (wh *WebSocket) HandleEvent(o interface{}) {
 		log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
 			remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
 		return
-	case *notify.InstanceEvent:
+	case *event.InstanceEvent:
 		resp := o.Response
 
 		providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
@@ -260,7 +260,7 @@ func (wh *WebSocket) HandleEvent(o interface{}) {
 	}
 
 	err := wh.WriteMessage(message)
-	if evt, ok := o.(*notify.InstanceEvent); ok {
+	if evt, ok := o.(*event.InstanceEvent); ok {
 		metrics.ReportPublishCompleted(evt, err)
 	}
 	if err != nil {
@@ -295,7 +295,7 @@ func (wh *WebSocket) Stop() {
 func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
 	domainProject := util.ParseDomainProject(ctx)
 	domain := util.ParseDomain(ctx)
-	socket := New(ctx, conn, notify.NewInstanceEventListWatcher(serviceID, domainProject, f))
+	socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, domainProject, f))
 
 	metrics.ReportSubscriber(domain, Websocket, 1)
 	process(socket)
@@ -320,7 +320,7 @@ func SendEstablishError(conn *websocket.Conn, err error) {
 	}
 }
 
-func New(ctx context.Context, conn *websocket.Conn, watcher *notify.InstanceEventListWatcher) *WebSocket {
+func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceEventListWatcher) *WebSocket {
 	return &WebSocket{
 		ctx:     ctx,
 		conn:    conn,
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 2764084..7f21727 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -18,21 +18,19 @@ package ws_test
 
 // initialize
 import (
-	"context"
-	"errors"
-
 	_ "github.com/apache/servicecomb-service-center/test"
 
-	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
-	"github.com/apache/servicecomb-service-center/server/core"
-
+	"context"
+	"errors"
 	"net/http"
 	"net/http/httptest"
 	"strings"
 	"testing"
 	"time"
 
-	. "github.com/apache/servicecomb-service-center/server/notify"
+	wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+	"github.com/apache/servicecomb-service-center/server/core"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"github.com/go-chassis/cari/discovery"
 	"github.com/gorilla/websocket"
 )
@@ -71,7 +69,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
 	wss.SendEstablishError(conn, errors.New("error"))
 
-	w := NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
+	w := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
 		results = append(results, &discovery.WatchInstanceResponse{
 			Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
 			Action:   string(discovery.EVT_CREATE),
@@ -87,12 +85,12 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 		t.Fatalf("TestPublisher_Run")
 	}
 
-	Center().Start()
+	event.Center().Start()
 
 	go func() {
 		wss.ListAndWatch(context.Background(), "", nil, conn)
 
-		w2 := NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
+		w2 := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
 			return
 		})
 		ws2 := wss.New(context.Background(), conn, w2)
@@ -105,9 +103,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 	go ws.HandleControlMessage()
 
 	w.OnMessage(nil)
-	w.OnMessage(&InstanceEvent{})
+	w.OnMessage(&event.InstanceEvent{})
 
-	Center().Publish(NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
+	event.Center().Fire(event.NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
 		Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
 		Action:   string(discovery.EVT_CREATE),
 		Key:      &discovery.MicroServiceKey{},
diff --git a/server/notify/center.go b/server/event/center.go
similarity index 69%
rename from server/notify/center.go
rename to server/event/center.go
index 35fa0e5..732a762 100644
--- a/server/notify/center.go
+++ b/server/event/center.go
@@ -13,18 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import (
-	"github.com/apache/servicecomb-service-center/pkg/notify"
+	"github.com/apache/servicecomb-service-center/pkg/event"
 )
 
-var notifyService *notify.Service
+var busService *event.BusService
 
 func init() {
-	notifyService = notify.NewNotifyService()
+	busService = event.NewBusService()
 }
 
-func Center() *notify.Service {
-	return notifyService
+// Center handle diff types of events
+// event type can be 'ALARM'(biz alarms), 'RESOURCE'(resource changes, like INSTANCE) or
+// inner type 'NOTIFY'(subscriber health check)
+func Center() *event.BusService {
+	return busService
 }
diff --git a/server/notify/instance_subscriber.go b/server/event/instance_subscriber.go
similarity index 84%
rename from server/notify/instance_subscriber.go
rename to server/event/instance_subscriber.go
index ed295d8..f61f622 100644
--- a/server/notify/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
 	"context"
 	"time"
 
+	"github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/notify"
 	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	pb "github.com/go-chassis/cari/discovery"
 )
@@ -33,17 +33,17 @@ const (
 	EventQueueSize = 5000
 )
 
-var INSTANCE = notify.RegisterType("INSTANCE", EventQueueSize)
+var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
 
 // 状态变化推送
 type InstanceEvent struct {
-	notify.Event
+	event.Event
 	Revision int64
 	Response *pb.WatchInstanceResponse
 }
 
 type InstanceEventListWatcher struct {
-	notify.Subscriber
+	event.Subscriber
 	Job          chan *InstanceEvent
 	ListRevision int64
 	ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
@@ -53,7 +53,7 @@ type InstanceEventListWatcher struct {
 func (w *InstanceEventListWatcher) SetError(err error) {
 	w.Subscriber.SetError(err)
 	// 触发清理job
-	e := w.Service().Publish(notify.NewErrEvent(w))
+	e := w.Bus().Fire(event.NewUnhealthyEvent(w))
 	if e != nil {
 		log.Error("", e)
 	}
@@ -63,7 +63,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
 	if w.Err() != nil {
 		return
 	}
-	log.Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
+	log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
 	gopool.Go(w.listAndPublishJobs)
 }
 
@@ -80,7 +80,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
 }
 
 //被通知
-func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
+func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
 	if w.Err() != nil {
 		return
 	}
@@ -106,7 +106,7 @@ func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
 
 	// the negative revision is specially for mongo scene,should be removed after mongo support revison.
 	if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
-		log.Warnf("unexpected notify %s job is coming in, watcher %s %s, job is %v, current revision is %v",
+		log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
 			w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
 		return
 	}
@@ -140,7 +140,7 @@ func (w *InstanceEventListWatcher) Close() {
 
 func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
 	return &InstanceEvent{
-		Event:    notify.NewEvent(INSTANCE, domainProject, serviceID),
+		Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
 		Revision: rev,
 		Response: response,
 	}
@@ -148,7 +148,7 @@ func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.W
 
 func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
 	return &InstanceEvent{
-		Event:    notify.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+		Event:    event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
 		Revision: rev,
 		Response: response,
 	}
@@ -157,7 +157,7 @@ func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, create
 func NewInstanceEventListWatcher(serviceID, domainProject string,
 	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
 	watcher := &InstanceEventListWatcher{
-		Subscriber: notify.NewSubscriber(INSTANCE, domainProject, serviceID),
+		Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
 		Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
 		ListFunc:   listFunc,
 		listCh:     make(chan struct{}),
diff --git a/server/health/health_test.go b/server/health/health_test.go
index d99cfec..804df1e 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -16,15 +16,14 @@
 package health
 
 import (
+	"github.com/apache/servicecomb-service-center/server/alarm"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"testing"
 	"time"
-
-	"github.com/apache/servicecomb-service-center/server/alarm"
-	"github.com/apache/servicecomb-service-center/server/notify"
 )
 
 func TestDefaultHealthChecker_Healthy(t *testing.T) {
-	notify.Center().Start()
+	event.Center().Start()
 
 	// normal case
 	var hc DefaultHealthChecker
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index 11eed2b..aea7c97 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -18,10 +18,10 @@
 package metrics
 
 import (
+	"github.com/apache/servicecomb-service-center/pkg/event"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/pkg/metrics"
-	"github.com/apache/servicecomb-service-center/pkg/notify"
 	helper "github.com/apache/servicecomb-service-center/pkg/prometheus"
 	"github.com/prometheus/client_golang/prometheus"
 )
@@ -53,7 +53,7 @@ var (
 		}, []string{"instance", "domain", "scheme"})
 )
 
-func ReportPublishCompleted(evt notify.Event, err error) {
+func ReportPublishCompleted(evt event.Event, err error) {
 	instance := metrics.InstanceName()
 	elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
 	status := success
diff --git a/server/server.go b/server/server.go
index ac7dee7..8b6f099 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,23 +19,23 @@ package server
 
 import (
 	"context"
+	"github.com/apache/servicecomb-service-center/server/event"
 	"net"
 	"os"
 	"strings"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/datasource"
+	nf "github.com/apache/servicecomb-service-center/pkg/event"
 	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/metrics"
-	nf "github.com/apache/servicecomb-service-center/pkg/notify"
 	"github.com/apache/servicecomb-service-center/pkg/plugin"
 	"github.com/apache/servicecomb-service-center/pkg/signal"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/command"
 	"github.com/apache/servicecomb-service-center/server/config"
 	"github.com/apache/servicecomb-service-center/server/core"
-	"github.com/apache/servicecomb-service-center/server/notify"
 	"github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf"
 	"github.com/apache/servicecomb-service-center/server/service/gov"
 	"github.com/apache/servicecomb-service-center/server/service/rbac"
@@ -64,7 +64,7 @@ type ServiceCenterServer struct {
 	GRPC endpoint
 
 	apiService          *APIServer
-	notifyService       *nf.Service
+	eventCenter         *nf.BusService
 	syncerNotifyService *snf.Service
 }
 
@@ -96,7 +96,7 @@ func (s *ServiceCenterServer) initialize() {
 	// Datasource
 	s.initDatasource()
 	s.apiService = GetAPIServer()
-	s.notifyService = notify.Center()
+	s.eventCenter = event.Center()
 	s.syncerNotifyService = snf.GetSyncerNotifyCenter()
 }
 
@@ -181,7 +181,7 @@ func (s *ServiceCenterServer) initSSL() {
 
 func (s *ServiceCenterServer) startServices() {
 	// notifications
-	s.notifyService.Start()
+	s.eventCenter.Start()
 
 	// notify syncer
 	syncerEnabled := config.GetBool("syncer.enabled", false)
@@ -217,8 +217,8 @@ func (s *ServiceCenterServer) Stop() {
 		s.apiService.Stop()
 	}
 
-	if s.notifyService != nil {
-		s.notifyService.Stop()
+	if s.eventCenter != nil {
+		s.eventCenter.Stop()
 	}
 
 	if s.syncerNotifyService != nil {