You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/18 07:11:38 UTC
[servicecomb-service-center] branch master updated: SCB-2176
Refactor event bus (#977)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new e9b3f1a SCB-2176 Refactor event bus (#977)
e9b3f1a is described below
commit e9b3f1a32b9ebc8ccd10f07f142615569c331111
Author: little-cui <su...@qq.com>
AuthorDate: Tue May 18 15:11:32 2021 +0800
SCB-2176 Refactor event bus (#977)
* SCB-2176 Refactor event bus
* SCB-2176 Fix: UT failures
---
datasource/etcd/event/instance_event_handler.go | 8 +--
datasource/etcd/event/rule_event_handler.go | 4 +-
datasource/etcd/event/tag_event_handler.go | 4 +-
datasource/etcd/sd/k8s/adaptor/listwatcher.go | 6 +-
datasource/mongo/event/instance_event_handler.go | 6 +-
pkg/{notify/processor.go => event/bus.go} | 58 ++++++++++---------
.../bus_service.go} | 65 +++++++++++----------
.../bus_service_test.go} | 12 ++--
.../processor_test.go => event/bus_test.go} | 12 ++--
pkg/{notify => event}/common.go | 6 +-
pkg/{notify/notice.go => event/event.go} | 2 +-
pkg/{notify/notice_test.go => event/event_test.go} | 7 +--
pkg/{notify/subject.go => event/poster.go} | 41 +++++++------
.../subject_test.go => event/poster_test.go} | 16 +++---
pkg/{notify => event}/subscriber.go | 28 ++++-----
pkg/event/subscriber_checker.go | 63 ++++++++++++++++++++
pkg/{notify/group.go => event/subscriber_group.go} | 38 ++++++------
.../subscriber_group_test.go} | 25 ++++----
pkg/{notify => event}/types.go | 14 +++--
pkg/{notify => event}/types_test.go | 9 +--
pkg/notify/subscriber_checker.go | 67 ----------------------
pkg/queue/taskqueue.go | 6 +-
pkg/queue/taskqueue_test.go | 10 ++--
pkg/notify/common.go => server/alarm/center.go | 11 ++--
server/alarm/common.go | 5 +-
server/alarm/model/types.go | 2 +-
server/alarm/service.go | 17 ++----
server/connection/grpc/stream.go | 8 +--
server/connection/grpc/stream_test.go | 6 +-
server/connection/ws/websocket.go | 14 ++---
server/connection/ws/websocket_test.go | 22 ++++---
server/{notify => event}/center.go | 15 +++--
server/{notify => event}/instance_subscriber.go | 24 ++++----
server/health/health_test.go | 7 +--
server/metrics/connection.go | 4 +-
server/server.go | 14 ++---
36 files changed, 331 insertions(+), 325 deletions(-)
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index d5058e9..7297b7c 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -33,8 +33,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
- "github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/syncernotify"
)
@@ -104,7 +104,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
NotifySyncerInstanceEvent(evt, domainProject, ms)
}
- if notify.Center().Closed() {
+ if event.Center().Closed() {
log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
action, providerID, providerInstanceID, instance.Endpoints))
return
@@ -147,8 +147,8 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
}
for _, consumerID := range subscribers {
// TODO add超时怎么处理?
- evt := notify.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
- err := notify.Center().Publish(evt)
+ evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
+ err := event.Center().Fire(evt)
if err != nil {
log.Errorf(err, "publish event[%v] into channel failed", evt)
}
diff --git a/datasource/etcd/event/rule_event_handler.go b/datasource/etcd/event/rule_event_handler.go
index 1c59eef..9955f23 100644
--- a/datasource/etcd/event/rule_event_handler.go
+++ b/datasource/etcd/event/rule_event_handler.go
@@ -32,7 +32,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/task"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
)
type RulesChangedTask struct {
@@ -100,7 +100,7 @@ func (h *RuleEventHandler) OnEvent(evt sd.KvEvent) {
}
providerID, ruleID, domainProject := path.GetInfoFromRuleKV(evt.KV.Key)
- if notify.Center().Closed() {
+ if event.Center().Closed() {
log.Warnf("caught [%s] service rule[%s/%s] event, but notify service is closed",
action, providerID, ruleID)
return
diff --git a/datasource/etcd/event/tag_event_handler.go b/datasource/etcd/event/tag_event_handler.go
index 46fadeb..17a2d31 100644
--- a/datasource/etcd/event/tag_event_handler.go
+++ b/datasource/etcd/event/tag_event_handler.go
@@ -33,7 +33,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/task"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
)
type TagsChangedTask struct {
@@ -114,7 +114,7 @@ func (h *TagEventHandler) OnEvent(evt sd.KvEvent) {
consumerID, domainProject := path.GetInfoFromTagKV(evt.KV.Key)
- if notify.Center().Closed() {
+ if event.Center().Closed() {
log.Warnf("caught [%s] service tags[%s/%s] event, but notify service is closed",
action, consumerID, evt.KV.Value)
return
diff --git a/datasource/etcd/sd/k8s/adaptor/listwatcher.go b/datasource/etcd/sd/k8s/adaptor/listwatcher.go
index cf511bb..87af286 100644
--- a/datasource/etcd/sd/k8s/adaptor/listwatcher.go
+++ b/datasource/etcd/sd/k8s/adaptor/listwatcher.go
@@ -60,16 +60,16 @@ func NewListWatcher(t K8sType, lister cache.SharedIndexInformer, f OnEventFunc)
lw.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
- Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
+ Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
},
UpdateFunc: func(old, new interface{}) {
if !reflect.DeepEqual(old, new) {
- Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
+ Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
PrevObject: old}})
}
},
DeleteFunc: func(obj interface{}) {
- Queue(t).Add(queue.Task{Object: K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
+ Queue(t).Add(queue.Task{Payload: K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
},
})
Queue(t).AddWorker(lw)
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index 9d85aba..a32c5ce 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -34,8 +34,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
simple "github.com/apache/servicecomb-service-center/pkg/time"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
- "github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/syncernotify"
)
@@ -109,8 +109,8 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
Instance: evt.Value.(model.Instance).Instance,
}
for _, consumerID := range subscribers {
- evt := notify.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
- err := notify.Center().Publish(evt)
+ evt := event.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
+ err := event.Center().Fire(evt)
if err != nil {
log.Error(fmt.Sprintf("publish event[%v] into channel failed", evt), err)
}
diff --git a/pkg/notify/processor.go b/pkg/event/bus.go
similarity index 54%
rename from pkg/notify/processor.go
rename to pkg/event/bus.go
index 1bee22d..a573381 100644
--- a/pkg/notify/processor.go
+++ b/pkg/event/bus.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"context"
@@ -24,74 +24,76 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
-type Processor struct {
+// Bus can fire the event aync and dispatch events to subscriber according to subject
+type Bus struct {
*queue.TaskQueue
name string
subjects *util.ConcurrentMap
}
-func (p *Processor) Name() string {
- return p.name
+func (bus *Bus) Name() string {
+ return bus.name
}
-func (p *Processor) Accept(evt Event) {
- p.Add(queue.Task{Object: evt})
+func (bus *Bus) Fire(evt Event) {
+ // TODO add option if queue is full
+ bus.Add(queue.Task{Payload: evt})
}
-func (p *Processor) Handle(ctx context.Context, obj interface{}) {
- p.Notify(obj.(Event))
+func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
+ bus.fireAtOnce(evt.(Event))
}
-func (p *Processor) Notify(evt Event) {
- if itf, ok := p.subjects.Get(evt.Subject()); ok {
- itf.(*Subject).Notify(evt)
+func (bus *Bus) fireAtOnce(evt Event) {
+ if itf, ok := bus.subjects.Get(evt.Subject()); ok {
+ itf.(*Poster).Post(evt)
}
}
-func (p *Processor) Subjects(name string) *Subject {
- itf, ok := p.subjects.Get(name)
+func (bus *Bus) Subjects(name string) *Poster {
+ itf, ok := bus.subjects.Get(name)
if !ok {
return nil
}
- return itf.(*Subject)
+ return itf.(*Poster)
}
-func (p *Processor) AddSubscriber(n Subscriber) {
- item, _ := p.subjects.Fetch(n.Subject(), func() (interface{}, error) {
- return NewSubject(n.Subject()), nil
+func (bus *Bus) AddSubscriber(n Subscriber) {
+ item, _ := bus.subjects.Fetch(n.Subject(), func() (interface{}, error) {
+ return NewPoster(n.Subject()), nil
})
- item.(*Subject).GetOrNewGroup(n.Group()).AddSubscriber(n)
+ item.(*Poster).GetOrNewGroup(n.Group()).AddMember(n)
}
-func (p *Processor) Remove(n Subscriber) {
- itf, ok := p.subjects.Get(n.Subject())
+func (bus *Bus) RemoveSubscriber(n Subscriber) {
+ itf, ok := bus.subjects.Get(n.Subject())
if !ok {
return
}
- s := itf.(*Subject)
+ s := itf.(*Poster)
g := s.Groups(n.Group())
if g == nil {
return
}
- g.Remove(n.ID())
+ g.RemoveMember(n.ID())
if g.Size() == 0 {
- s.Remove(g.Name())
+ s.RemoveGroup(g.Name())
}
if s.Size() == 0 {
- p.subjects.Remove(s.Name())
+ bus.subjects.Remove(s.Subject())
}
}
-func (p *Processor) Clear() {
- p.subjects.Clear()
+func (bus *Bus) Clear() {
+ bus.subjects.Clear()
}
-func NewProcessor(name string, queueSize int) *Processor {
- p := &Processor{
+func NewBus(name string, queueSize int) *Bus {
+ p := &Bus{
TaskQueue: queue.NewTaskQueue(queueSize),
name: name,
subjects: util.NewConcurrentMap(0),
diff --git a/pkg/notify/notification_service.go b/pkg/event/bus_service.go
similarity index 66%
rename from pkg/notify/notification_service.go
rename to pkg/event/bus_service.go
index d63cace..2b53597 100644
--- a/pkg/notify/notification_service.go
+++ b/pkg/event/bus_service.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"errors"
@@ -25,15 +25,18 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
)
-type Service struct {
- processors map[Type]*Processor
- mux sync.RWMutex
- isClose bool
+// BusService is the daemon service to manage multiple type Bus
+// and wrap handle methods of Bus
+type BusService struct {
+ // buses is the map of event handler, key is event source type
+ buses map[Type]*Bus
+ mux sync.RWMutex
+ isClose bool
}
-func (s *Service) newProcessor(t Type) *Processor {
+func (s *BusService) newBus(t Type) *Bus {
s.mux.RLock()
- p, ok := s.processors[t]
+ p, ok := s.buses[t]
if ok {
s.mux.RUnlock()
return p
@@ -41,20 +44,20 @@ func (s *Service) newProcessor(t Type) *Processor {
s.mux.RUnlock()
s.mux.Lock()
- p, ok = s.processors[t]
+ p, ok = s.buses[t]
if ok {
s.mux.Unlock()
return p
}
- p = NewProcessor(t.String(), t.QueueSize())
- s.processors[t] = p
+ p = NewBus(t.String(), t.QueueSize())
+ s.buses[t] = p
s.mux.Unlock()
p.Run()
return p
}
-func (s *Service) Start() {
+func (s *BusService) Start() {
if !s.Closed() {
log.Warnf("notify service is already running")
return
@@ -64,7 +67,7 @@ func (s *Service) Start() {
s.mux.Unlock()
// 错误subscriber清理
- err := s.AddSubscriber(NewSubscriberChecker())
+ err := s.AddSubscriber(NewSubscriberHealthChecker())
if err != nil {
log.Error("", err)
}
@@ -72,7 +75,7 @@ func (s *Service) Start() {
log.Debugf("notify service is started")
}
-func (s *Service) AddSubscriber(n Subscriber) error {
+func (s *BusService) AddSubscriber(n Subscriber) error {
if n == nil {
err := errors.New("required Subscriber")
log.Errorf(err, "add subscriber failed")
@@ -85,30 +88,30 @@ func (s *Service) AddSubscriber(n Subscriber) error {
return err
}
- p := s.newProcessor(n.Type())
- n.SetService(s)
+ p := s.newBus(n.Type())
+ n.SetBus(s)
n.OnAccept()
p.AddSubscriber(n)
return nil
}
-func (s *Service) RemoveSubscriber(n Subscriber) {
+func (s *BusService) RemoveSubscriber(n Subscriber) {
s.mux.RLock()
- p, ok := s.processors[n.Type()]
+ p, ok := s.buses[n.Type()]
if !ok {
s.mux.RUnlock()
return
}
s.mux.RUnlock()
- p.Remove(n)
+ p.RemoveSubscriber(n)
n.Close()
}
-func (s *Service) stopProcessors() {
+func (s *BusService) closeBuses() {
s.mux.RLock()
- for _, p := range s.processors {
+ for _, p := range s.buses {
p.Clear()
p.Stop()
}
@@ -116,30 +119,30 @@ func (s *Service) stopProcessors() {
}
//通知内容塞到队列里
-func (s *Service) Publish(evt Event) error {
+func (s *BusService) Fire(evt Event) error {
if s.Closed() {
- return errors.New("add notify event failed for server shutdown")
+ return errors.New("add notify evt failed for server shutdown")
}
s.mux.RLock()
- p, ok := s.processors[evt.Type()]
+ bus, ok := s.buses[evt.Type()]
if !ok {
s.mux.RUnlock()
return fmt.Errorf("unknown event type[%s]", evt.Type())
}
s.mux.RUnlock()
- p.Accept(evt)
+ bus.Fire(evt)
return nil
}
-func (s *Service) Closed() (b bool) {
+func (s *BusService) Closed() (b bool) {
s.mux.RLock()
b = s.isClose
s.mux.RUnlock()
return
}
-func (s *Service) Stop() {
+func (s *BusService) Stop() {
if s.Closed() {
return
}
@@ -147,14 +150,14 @@ func (s *Service) Stop() {
s.isClose = true
s.mux.Unlock()
- s.stopProcessors()
+ s.closeBuses()
log.Debug("notify service stopped")
}
-func NewNotifyService() *Service {
- return &Service{
- processors: make(map[Type]*Processor),
- isClose: true,
+func NewBusService() *BusService {
+ return &BusService{
+ buses: make(map[Type]*Bus),
+ isClose: true,
}
}
diff --git a/pkg/notify/notification_test.go b/pkg/event/bus_service_test.go
similarity index 90%
rename from pkg/notify/notification_test.go
rename to pkg/event/bus_service_test.go
index ba23613..c6ae242 100644
--- a/pkg/notify/notification_test.go
+++ b/pkg/event/bus_service_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"testing"
@@ -27,7 +27,7 @@ import (
func TestGetNotifyService(t *testing.T) {
INSTANCE := RegisterType("INSTANCE", 1)
- notifyService := NewNotifyService()
+ notifyService := NewBusService()
if notifyService == nil {
t.Fatalf("TestGetNotifyService failed")
}
@@ -39,7 +39,7 @@ func TestGetNotifyService(t *testing.T) {
if err == nil {
t.Fatalf("TestGetNotifyService failed")
}
- err = notifyService.Publish(nil)
+ err = notifyService.Fire(nil)
if err == nil {
t.Fatalf("TestGetNotifyService failed")
}
@@ -63,15 +63,15 @@ func TestGetNotifyService(t *testing.T) {
t.Fatalf("TestGetNotifyService failed, %v", err)
}
j := &baseEvent{INSTANCE, "s", "g", simple.FromTime(time.Now())}
- err = notifyService.Publish(j)
+ err = notifyService.Fire(j)
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
- err = notifyService.Publish(NewErrEvent(NewSubscriberChecker()))
+ err = notifyService.Fire(NewUnhealthyEvent(NewSubscriberHealthChecker()))
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
- err = notifyService.Publish(NewErrEvent(s))
+ err = notifyService.Fire(NewUnhealthyEvent(s))
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
diff --git a/pkg/notify/processor_test.go b/pkg/event/bus_test.go
similarity index 91%
rename from pkg/notify/processor_test.go
rename to pkg/event/bus_test.go
index 894231b..a5e214b 100644
--- a/pkg/notify/processor_test.go
+++ b/pkg/event/bus_test.go
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
import (
"testing"
@@ -37,7 +37,7 @@ func TestProcessor_Do(t *testing.T) {
job: make(chan Event, 1)}
mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", "g2"),
job: make(chan Event, 1)}
- p := NewProcessor("p1", 0)
+ p := NewBus("p1", 0)
if p.Name() != "p1" {
t.Fatalf("TestProcessor_Do")
}
@@ -45,12 +45,12 @@ func TestProcessor_Do(t *testing.T) {
t.Fatalf("TestProcessor_Do")
}
p.AddSubscriber(mock1)
- if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Subscribers(mock1.ID()) != mock1 {
+ if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Member(mock1.ID()) != mock1 {
t.Fatalf("TestProcessor_Do")
}
- p.Remove(NewSubscriber(INSTANCE, "s2", "g1"))
- p.Remove(NewSubscriber(INSTANCE, "s1", "g2"))
- p.Remove(mock1)
+ p.RemoveSubscriber(NewSubscriber(INSTANCE, "s2", "g1"))
+ p.RemoveSubscriber(NewSubscriber(INSTANCE, "s1", "g2"))
+ p.RemoveSubscriber(mock1)
if p.Subjects(mock1.Subject()) != nil {
t.Fatalf("TestProcessor_Do")
}
diff --git a/pkg/notify/common.go b/pkg/event/common.go
similarity index 95%
copy from pkg/notify/common.go
copy to pkg/event/common.go
index a6b5337..3b2f115 100644
--- a/pkg/notify/common.go
+++ b/pkg/event/common.go
@@ -15,8 +15,12 @@
* limitations under the License.
*/
-package notify
+package event
const (
DefaultQueueSize = 1000
)
+
+const (
+ INNER Type = iota
+)
diff --git a/pkg/notify/notice.go b/pkg/event/event.go
similarity index 99%
rename from pkg/notify/notice.go
rename to pkg/event/event.go
index d20ec7f..d057a80 100644
--- a/pkg/notify/notice.go
+++ b/pkg/event/event.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"time"
diff --git a/pkg/notify/notice_test.go b/pkg/event/event_test.go
similarity index 87%
rename from pkg/notify/notice_test.go
rename to pkg/event/event_test.go
index 3eba149..ed181fb 100644
--- a/pkg/notify/notice_test.go
+++ b/pkg/event/event_test.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import (
"fmt"
@@ -21,14 +21,13 @@ import (
)
func TestNewEventWithTime(t *testing.T) {
- regT := RegisterType("N", 1)
- evt := NewEvent(regT, "a", "b")
+ evt := NewEvent(INNER, "a", "b")
if evt.CreateAt().UnixNano() == 0 {
t.Fatal("TestNewEventWithTime")
}
fmt.Println(evt.CreateAt())
- if evt.Type() != regT || evt.Subject() != "a" || evt.Group() != "b" {
+ if evt.Type() != INNER || evt.Subject() != "a" || evt.Group() != "b" {
t.Fatal("TestNewEventWithTime")
}
}
diff --git a/pkg/notify/subject.go b/pkg/event/poster.go
similarity index 66%
rename from pkg/notify/subject.go
rename to pkg/event/poster.go
index 4f79ba9..4cb3f49 100644
--- a/pkg/notify/subject.go
+++ b/pkg/event/poster.go
@@ -15,25 +15,32 @@
* limitations under the License.
*/
-package notify
+package event
import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
-type Subject struct {
- name string
- groups *util.ConcurrentMap
+// Poster post the events of specified subject to the subscribers
+type Poster struct {
+ subject string
+ groups *util.ConcurrentMap
}
-func (s *Subject) Name() string {
- return s.name
+func (s *Poster) Subject() string {
+ return s.subject
}
-func (s *Subject) Notify(job Event) {
+func (s *Poster) Post(job Event) {
+ f := func(g *Group) {
+ g.ForEach(func(m Subscriber) {
+ m.OnMessage(job)
+ })
+ }
+
if len(job.Group()) == 0 {
s.groups.ForEach(func(item util.MapItem) (next bool) {
- item.Value.(*Group).Notify(job)
+ f(item.Value.(*Group))
return true
})
return
@@ -43,10 +50,10 @@ func (s *Subject) Notify(job Event) {
if !ok {
return
}
- itf.(*Group).Notify(job)
+ f(itf.(*Group))
}
-func (s *Subject) Groups(name string) *Group {
+func (s *Poster) Groups(name string) *Group {
g, ok := s.groups.Get(name)
if !ok {
return nil
@@ -54,24 +61,24 @@ func (s *Subject) Groups(name string) *Group {
return g.(*Group)
}
-func (s *Subject) GetOrNewGroup(name string) *Group {
+func (s *Poster) GetOrNewGroup(name string) *Group {
item, _ := s.groups.Fetch(name, func() (interface{}, error) {
return NewGroup(name), nil
})
return item.(*Group)
}
-func (s *Subject) Remove(name string) {
+func (s *Poster) RemoveGroup(name string) {
s.groups.Remove(name)
}
-func (s *Subject) Size() int {
+func (s *Poster) Size() int {
return s.groups.Size()
}
-func NewSubject(name string) *Subject {
- return &Subject{
- name: name,
- groups: util.NewConcurrentMap(0),
+func NewPoster(subject string) *Poster {
+ return &Poster{
+ subject: subject,
+ groups: util.NewConcurrentMap(0),
}
}
diff --git a/pkg/notify/subject_test.go b/pkg/event/poster_test.go
similarity index 92%
rename from pkg/notify/subject_test.go
rename to pkg/event/poster_test.go
index c9e955c..931b5de 100644
--- a/pkg/notify/subject_test.go
+++ b/pkg/event/poster_test.go
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
import "testing"
func TestSubject_Fetch(t *testing.T) {
- s := NewSubject("s1")
- if s.Name() != "s1" {
+ s := NewPoster("s1")
+ if s.Subject() != "s1" {
t.Fatalf("TestSubject_Fetch failed")
}
g := s.GetOrNewGroup("g1")
@@ -37,7 +37,7 @@ func TestSubject_Fetch(t *testing.T) {
if s.Size() != 2 {
t.Fatalf("TestSubject_Fetch failed")
}
- s.Remove(o.Name())
+ s.RemoveGroup(o.Name())
if s.Groups("g2") != nil {
t.Fatalf("TestSubject_Fetch failed")
}
@@ -47,19 +47,19 @@ func TestSubject_Fetch(t *testing.T) {
INSTANCE := RegisterType("INSTANCE", 1)
mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g2")}
- g.AddSubscriber(mock1)
+ g.AddMember(mock1)
job := &baseEvent{group: "g3"}
- s.Notify(job)
+ s.Post(job)
if mock1.job != nil || mock2.job != nil {
t.Fatalf("TestSubject_Fetch failed")
}
job.group = "g1"
- s.Notify(job)
+ s.Post(job)
if mock1.job != job || mock2.job != nil {
t.Fatalf("TestSubject_Fetch failed")
}
job.group = ""
- s.Notify(job)
+ s.Post(job)
if mock1.job != job && mock2.job != job {
t.Fatalf("TestSubject_Fetch failed")
}
diff --git a/pkg/notify/subscriber.go b/pkg/event/subscriber.go
similarity index 67%
rename from pkg/notify/subscriber.go
rename to pkg/event/subscriber.go
index ccacb4a..b26060b 100644
--- a/pkg/notify/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"errors"
@@ -28,8 +28,8 @@ type Subscriber interface {
Subject() string
Group() string
Type() Type
- Service() *Service
- SetService(*Service)
+ Bus() *BusService
+ SetBus(*BusService)
Err() error
SetError(err error)
@@ -45,20 +45,20 @@ type baseSubscriber struct {
id string
subject string
group string
- service *Service
+ service *BusService
err error
}
-func (s *baseSubscriber) ID() string { return s.id }
-func (s *baseSubscriber) Subject() string { return s.subject }
-func (s *baseSubscriber) Group() string { return s.group }
-func (s *baseSubscriber) Type() Type { return s.nType }
-func (s *baseSubscriber) Service() *Service { return s.service }
-func (s *baseSubscriber) SetService(svc *Service) { s.service = svc }
-func (s *baseSubscriber) Err() error { return s.err }
-func (s *baseSubscriber) SetError(err error) { s.err = err }
-func (s *baseSubscriber) Close() {}
-func (s *baseSubscriber) OnAccept() {}
+func (s *baseSubscriber) ID() string { return s.id }
+func (s *baseSubscriber) Subject() string { return s.subject }
+func (s *baseSubscriber) Group() string { return s.group }
+func (s *baseSubscriber) Type() Type { return s.nType }
+func (s *baseSubscriber) Bus() *BusService { return s.service }
+func (s *baseSubscriber) SetBus(svc *BusService) { s.service = svc }
+func (s *baseSubscriber) Err() error { return s.err }
+func (s *baseSubscriber) SetError(err error) { s.err = err }
+func (s *baseSubscriber) Close() {}
+func (s *baseSubscriber) OnAccept() {}
func (s *baseSubscriber) OnMessage(job Event) {
s.SetError(errors.New("do not call base notifier OnMessage method"))
}
diff --git a/pkg/event/subscriber_checker.go b/pkg/event/subscriber_checker.go
new file mode 100644
index 0000000..28c4cb6
--- /dev/null
+++ b/pkg/event/subscriber_checker.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import "github.com/apache/servicecomb-service-center/pkg/log"
+
+const (
+ CheckerGroup = "__HealthChecker__"
+ CheckerSubject = "__NotifyServerHealthCheck__"
+)
+
+// SubscriberHealthChecker check subscriber health and remove it from bus if return Err
+type SubscriberHealthChecker struct {
+ Subscriber
+}
+
+type UnhealthyEvent struct {
+ Event
+ ErrorSubscriber Subscriber
+}
+
+func (s *SubscriberHealthChecker) OnMessage(evt Event) {
+ j := evt.(*UnhealthyEvent)
+ err := j.ErrorSubscriber.Err()
+
+ if j.ErrorSubscriber.Type() == INNER {
+ log.Errorf(nil, "remove %s watcher failed, here cause a dead lock, subject: %s, group: %s",
+ j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
+ return
+ }
+
+ log.Debugf("notification service remove %s watcher, error: %v, subject: %s, group: %s",
+ j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
+ s.Bus().RemoveSubscriber(j.ErrorSubscriber)
+}
+
+func NewSubscriberHealthChecker() *SubscriberHealthChecker {
+ return &SubscriberHealthChecker{
+ Subscriber: NewSubscriber(INNER, CheckerSubject, CheckerGroup),
+ }
+}
+
+func NewUnhealthyEvent(s Subscriber) *UnhealthyEvent {
+ return &UnhealthyEvent{
+ Event: NewEvent(INNER, CheckerSubject, CheckerGroup),
+ ErrorSubscriber: s,
+ }
+}
diff --git a/pkg/notify/group.go b/pkg/event/subscriber_group.go
similarity index 65%
rename from pkg/notify/group.go
rename to pkg/event/subscriber_group.go
index c600bba..0c41f4a 100644
--- a/pkg/notify/group.go
+++ b/pkg/event/subscriber_group.go
@@ -15,51 +15,51 @@
* limitations under the License.
*/
-package notify
+package event
import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
type Group struct {
- name string
- subscribers *util.ConcurrentMap
+ name string
+ members *util.ConcurrentMap
}
func (g *Group) Name() string {
return g.name
}
-func (g *Group) Notify(job Event) {
- g.subscribers.ForEach(func(item util.MapItem) (next bool) {
- item.Value.(Subscriber).OnMessage(job)
- return true
- })
-}
-
-func (g *Group) Subscribers(name string) Subscriber {
- s, ok := g.subscribers.Get(name)
+func (g *Group) Member(name string) Subscriber {
+ s, ok := g.members.Get(name)
if !ok {
return nil
}
return s.(Subscriber)
}
-func (g *Group) AddSubscriber(subscriber Subscriber) Subscriber {
- return g.subscribers.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
+func (g *Group) ForEach(iter func(m Subscriber)) {
+ g.members.ForEach(func(item util.MapItem) (next bool) {
+ iter(item.Value.(Subscriber))
+ return true
+ })
+}
+
+func (g *Group) AddMember(subscriber Subscriber) Subscriber {
+ return g.members.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
}
-func (g *Group) Remove(name string) {
- g.subscribers.Remove(name)
+func (g *Group) RemoveMember(name string) {
+ g.members.Remove(name)
}
func (g *Group) Size() int {
- return g.subscribers.Size()
+ return g.members.Size()
}
func NewGroup(name string) *Group {
return &Group{
- name: name,
- subscribers: util.NewConcurrentMap(0),
+ name: name,
+ members: util.NewConcurrentMap(0),
}
}
diff --git a/pkg/notify/group_test.go b/pkg/event/subscriber_group_test.go
similarity index 79%
rename from pkg/notify/group_test.go
rename to pkg/event/subscriber_group_test.go
index 81040a7..b0dc22d 100644
--- a/pkg/notify/group_test.go
+++ b/pkg/event/subscriber_group_test.go
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
-import "testing"
+import (
+ "testing"
+)
type mockSubscriber struct {
Subscriber
@@ -34,37 +36,32 @@ func TestGroup_Add(t *testing.T) {
if g.Name() != "g1" {
t.Fatalf("TestGroup_Add failed")
}
- if g.AddSubscriber(m) != m {
+ if g.AddMember(m) != m {
t.Fatalf("TestGroup_Add failed")
}
- if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m {
+ if g.AddMember(NewSubscriber(INSTANCE, "s1", "g1")) == m {
t.Fatalf("TestGroup_Add failed")
}
same := *(m.(*baseSubscriber))
- if g.AddSubscriber(&same) != m {
+ if g.AddMember(&same) != m {
t.Fatalf("TestGroup_Add failed")
}
if g.Size() != 2 {
t.Fatalf("TestGroup_Add failed")
}
- g.Remove(m.ID())
+ g.RemoveMember(m.ID())
if g.Size() != 1 {
t.Fatalf("TestGroup_Add failed")
}
- if g.Subscribers(m.ID()) == m {
+ if g.Member(m.ID()) == m {
t.Fatalf("TestGroup_Add failed")
}
mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
- if g.AddSubscriber(mock) != mock {
+ if g.AddMember(mock) != mock {
t.Fatalf("TestGroup_Add failed")
}
- if g.Subscribers(mock.ID()) != mock {
- t.Fatalf("TestGroup_Add failed")
- }
- job := &baseEvent{nType: INSTANCE}
- g.Notify(job)
- if mock.job != job {
+ if g.Member(mock.ID()) != mock {
t.Fatalf("TestGroup_Add failed")
}
}
diff --git a/pkg/notify/types.go b/pkg/event/types.go
similarity index 92%
rename from pkg/notify/types.go
rename to pkg/event/types.go
index 3cf125c..d42377b 100644
--- a/pkg/notify/types.go
+++ b/pkg/event/types.go
@@ -13,11 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
-import (
- "strconv"
-)
+import "strconv"
type Type int
@@ -42,9 +40,13 @@ func (nt Type) IsValid() bool {
return nt >= 0 && int(nt) < len(typeQueues)
}
-var typeNames []string
+var typeNames = []string{
+ INNER: "INNER",
+}
-var typeQueues []int
+var typeQueues = []int{
+ INNER: 0,
+}
func Types() (ts []Type) {
for i := range typeNames {
diff --git a/pkg/notify/types_test.go b/pkg/event/types_test.go
similarity index 88%
rename from pkg/notify/types_test.go
rename to pkg/event/types_test.go
index c209c31..35e5a66 100644
--- a/pkg/notify/types_test.go
+++ b/pkg/event/types_test.go
@@ -13,11 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
-import (
- "testing"
-)
+import "testing"
func TestRegisterType(t *testing.T) {
id := RegisterType("a", 0)
@@ -32,4 +30,7 @@ func TestRegisterType(t *testing.T) {
if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
}
+ if INNER.String() != "INNER" || INNER.QueueSize() != DefaultQueueSize {
+ t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+ }
}
diff --git a/pkg/notify/subscriber_checker.go b/pkg/notify/subscriber_checker.go
deleted file mode 100644
index af73396..0000000
--- a/pkg/notify/subscriber_checker.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package notify
-
-import (
- "github.com/apache/servicecomb-service-center/pkg/log"
-)
-
-const (
- groupCheck = "__HealthChecker__"
- subjectCheck = "__SubscriberHealthCheck__"
-)
-
-var TypeCheck = RegisterType("CHECKER", DefaultQueueSize)
-
-//Notifier 健康检查
-type SubscriberChecker struct {
- Subscriber
-}
-
-type ErrEvent struct {
- Event
- Subscriber Subscriber
-}
-
-func (s *SubscriberChecker) OnMessage(evt Event) {
- j := evt.(*ErrEvent)
- err := j.Subscriber.Err()
-
- if j.Subscriber.Type() == TypeCheck {
- log.Errorf(nil, "remove %s subscriber failed, here cause a dead lock, subject: %s, group: %s",
- j.Subscriber.Type(), j.Subscriber.Subject(), j.Subscriber.Group())
- return
- }
-
- log.Debugf("notification service remove %s subscriber, error: %v, subject: %s, group: %s",
- j.Subscriber.Type(), err, j.Subscriber.Subject(), j.Subscriber.Group())
- s.Service().RemoveSubscriber(j.Subscriber)
-}
-
-func NewSubscriberChecker() *SubscriberChecker {
- return &SubscriberChecker{
- Subscriber: NewSubscriber(TypeCheck, subjectCheck, groupCheck),
- }
-}
-
-func NewErrEvent(by Subscriber) *ErrEvent {
- return &ErrEvent{
- Event: NewEvent(TypeCheck, subjectCheck, groupCheck),
- Subscriber: by,
- }
-}
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index 3eb65f6..1875ff2 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -30,7 +30,7 @@ type Worker interface {
}
type Task struct {
- Object interface{}
+ Payload interface{}
// Async can let workers handle this task concurrently, but
// it will make this task unordered
Async bool
@@ -62,13 +62,13 @@ func (q *TaskQueue) Do(ctx context.Context, task Task) {
if task.Async {
for _, w := range q.Workers {
q.goroutine.Do(func(ctx context.Context) {
- q.dispatch(ctx, w, task.Object)
+ q.dispatch(ctx, w, task.Payload)
})
}
return
}
for _, w := range q.Workers {
- q.dispatch(ctx, w, task.Object)
+ q.dispatch(ctx, w, task.Payload)
}
}
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 8c79da4..d0eecd6 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -34,26 +34,26 @@ func TestNewEventQueue(t *testing.T) {
q := NewTaskQueue(0)
q.AddWorker(h)
- q.Do(context.Background(), Task{Object: 1})
+ q.Do(context.Background(), Task{Payload: 1})
if <-h.Object != 1 {
t.Fatalf("TestNewEventQueue failed")
}
- q.Do(context.Background(), Task{Object: 11, Async: true})
+ q.Do(context.Background(), Task{Payload: 11, Async: true})
if <-h.Object != 11 {
t.Fatalf("TestNewEventQueue failed")
}
q.Run()
- q.Add(Task{Object: 2})
+ q.Add(Task{Payload: 2})
if <-h.Object != 2 {
t.Fatalf("TestNewEventQueue failed")
}
- q.Add(Task{Object: 22, Async: true})
+ q.Add(Task{Payload: 22, Async: true})
if <-h.Object != 22 {
t.Fatalf("TestNewEventQueue failed")
}
q.Stop()
- q.Add(Task{Object: 3})
+ q.Add(Task{Payload: 3})
}
diff --git a/pkg/notify/common.go b/server/alarm/center.go
similarity index 88%
rename from pkg/notify/common.go
rename to server/alarm/center.go
index a6b5337..9af6aa8 100644
--- a/pkg/notify/common.go
+++ b/server/alarm/center.go
@@ -15,8 +15,11 @@
* limitations under the License.
*/
-package notify
+package alarm
-const (
- DefaultQueueSize = 1000
-)
+func Center() *Service {
+ once.Do(func() {
+ service = NewAlarmService()
+ })
+ return service
+}
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 6d5eee3..cb9df7a 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,8 +17,7 @@ package alarm
import (
"fmt"
-
- "github.com/apache/servicecomb-service-center/pkg/notify"
+ "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/server/alarm/model"
)
@@ -43,7 +42,7 @@ const (
Group = "__ALARM_GROUP__"
)
-var ALARM = notify.RegisterType("ALARM", notify.DefaultQueueSize)
+var ALARM = event.RegisterType("ALARM", 0)
func FieldBool(key string, v bool) model.Field {
return model.Field{Key: key, Value: v}
diff --git a/server/alarm/model/types.go b/server/alarm/model/types.go
index 0c29d79..40d0a44 100644
--- a/server/alarm/model/types.go
+++ b/server/alarm/model/types.go
@@ -16,7 +16,7 @@
package model
import (
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/util"
)
diff --git a/server/alarm/service.go b/server/alarm/service.go
index 3aeeb3b..6b3cc1b 100644
--- a/server/alarm/service.go
+++ b/server/alarm/service.go
@@ -18,11 +18,11 @@ package alarm
import (
"sync"
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/log"
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/alarm/model"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
)
var (
@@ -45,7 +45,7 @@ func (ac *Service) Raise(id model.ID, fields ...model.Field) error {
for _, f := range fields {
ae.Fields[f.Key] = f.Value
}
- return notify.Center().Publish(ae)
+ return event.Center().Fire(ae)
}
func (ac *Service) Clear(id model.ID) error {
@@ -54,7 +54,7 @@ func (ac *Service) Clear(id model.ID) error {
Status: Cleared,
ID: id,
}
- return notify.Center().Publish(ae)
+ return event.Center().Fire(ae)
}
func (ac *Service) ListAll() (ls []*model.AlarmEvent) {
@@ -89,16 +89,9 @@ func NewAlarmService() *Service {
c := &Service{
Subscriber: nf.NewSubscriber(ALARM, Subject, Group),
}
- err := notify.Center().AddSubscriber(c)
+ err := event.Center().AddSubscriber(c)
if err != nil {
log.Error("", err)
}
return c
}
-
-func Center() *Service {
- once.Do(func() {
- service = NewAlarmService()
- })
- return service
-}
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index d8d0c1e..b74acc3 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -26,14 +26,14 @@ import (
"github.com/apache/servicecomb-service-center/pkg/proto"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
- "github.com/apache/servicecomb-service-center/server/notify"
pb "github.com/go-chassis/cari/discovery"
)
const GRPC = "gRPC"
-func Handle(watcher *notify.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
timer := time.NewTimer(connection.HeartbeatInterval)
defer timer.Stop()
for {
@@ -72,8 +72,8 @@ func Handle(watcher *notify.InstanceEventListWatcher, stream proto.ServiceInstan
func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- watcher := notify.NewInstanceEventListWatcher(serviceID, domainProject, f)
- err = notify.Center().AddSubscriber(watcher)
+ watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, f)
+ err = event.Center().AddSubscriber(watcher)
if err != nil {
return
}
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 33e8ee5..3381167 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
simple "github.com/apache/servicecomb-service-center/pkg/time"
stream "github.com/apache/servicecomb-service-center/server/connection/grpc"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
pb "github.com/go-chassis/cari/discovery"
"google.golang.org/grpc"
)
@@ -42,13 +42,13 @@ func (x *grpcWatchServer) Context() context.Context {
}
func TestHandleWatchJob(t *testing.T) {
- w := notify.NewInstanceEventListWatcher("g", "s", nil)
+ w := event.NewInstanceEventListWatcher("g", "s", nil)
w.Job <- nil
err := stream.Handle(w, &grpcWatchServer{})
if err == nil {
t.Fatalf("TestHandleWatchJob failed")
}
- w.Job <- notify.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
+ w.Job <- event.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
w.Job <- nil
stream.Handle(w, &grpcWatchServer{})
}
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index fc3c386..57b064c 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -27,8 +27,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
- "github.com/apache/servicecomb-service-center/server/notify"
pb "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
)
@@ -40,7 +40,7 @@ type WebSocket struct {
ticker *time.Ticker
conn *websocket.Conn
// watcher subscribe the notification service event
- watcher *notify.InstanceEventListWatcher
+ watcher *event.InstanceEventListWatcher
needPingWatcher bool
free chan struct{}
closed chan struct{}
@@ -57,7 +57,7 @@ func (wh *WebSocket) Init() error {
remoteAddr := wh.conn.RemoteAddr().String()
// put in notification service queue
- if err := notify.Center().AddSubscriber(wh.watcher); err != nil {
+ if err := event.Center().AddSubscriber(wh.watcher); err != nil {
err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
remoteAddr, err.Error())
log.Errorf(nil, err.Error())
@@ -228,7 +228,7 @@ func (wh *WebSocket) HandleEvent(o interface{}) {
log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
return
- case *notify.InstanceEvent:
+ case *event.InstanceEvent:
resp := o.Response
providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
@@ -260,7 +260,7 @@ func (wh *WebSocket) HandleEvent(o interface{}) {
}
err := wh.WriteMessage(message)
- if evt, ok := o.(*notify.InstanceEvent); ok {
+ if evt, ok := o.(*event.InstanceEvent); ok {
metrics.ReportPublishCompleted(evt, err)
}
if err != nil {
@@ -295,7 +295,7 @@ func (wh *WebSocket) Stop() {
func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- socket := New(ctx, conn, notify.NewInstanceEventListWatcher(serviceID, domainProject, f))
+ socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, domainProject, f))
metrics.ReportSubscriber(domain, Websocket, 1)
process(socket)
@@ -320,7 +320,7 @@ func SendEstablishError(conn *websocket.Conn, err error) {
}
}
-func New(ctx context.Context, conn *websocket.Conn, watcher *notify.InstanceEventListWatcher) *WebSocket {
+func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceEventListWatcher) *WebSocket {
return &WebSocket{
ctx: ctx,
conn: conn,
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 2764084..7f21727 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -18,21 +18,19 @@ package ws_test
// initialize
import (
- "context"
- "errors"
-
_ "github.com/apache/servicecomb-service-center/test"
- wss "github.com/apache/servicecomb-service-center/server/connection/ws"
- "github.com/apache/servicecomb-service-center/server/core"
-
+ "context"
+ "errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
- . "github.com/apache/servicecomb-service-center/server/notify"
+ wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
)
@@ -71,7 +69,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
wss.SendEstablishError(conn, errors.New("error"))
- w := NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
+ w := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
results = append(results, &discovery.WatchInstanceResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
Action: string(discovery.EVT_CREATE),
@@ -87,12 +85,12 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
t.Fatalf("TestPublisher_Run")
}
- Center().Start()
+ event.Center().Start()
go func() {
wss.ListAndWatch(context.Background(), "", nil, conn)
- w2 := NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
+ w2 := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
return
})
ws2 := wss.New(context.Background(), conn, w2)
@@ -105,9 +103,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
go ws.HandleControlMessage()
w.OnMessage(nil)
- w.OnMessage(&InstanceEvent{})
+ w.OnMessage(&event.InstanceEvent{})
- Center().Publish(NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
+ event.Center().Fire(event.NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
Action: string(discovery.EVT_CREATE),
Key: &discovery.MicroServiceKey{},
diff --git a/server/notify/center.go b/server/event/center.go
similarity index 69%
rename from server/notify/center.go
rename to server/event/center.go
index 35fa0e5..732a762 100644
--- a/server/notify/center.go
+++ b/server/event/center.go
@@ -13,18 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import (
- "github.com/apache/servicecomb-service-center/pkg/notify"
+ "github.com/apache/servicecomb-service-center/pkg/event"
)
-var notifyService *notify.Service
+var busService *event.BusService
func init() {
- notifyService = notify.NewNotifyService()
+ busService = event.NewBusService()
}
-func Center() *notify.Service {
- return notifyService
+// Center handle diff types of events
+// event type can be 'ALARM'(biz alarms), 'RESOURCE'(resource changes, like INSTANCE) or
+// inner type 'NOTIFY'(subscriber health check)
+func Center() *event.BusService {
+ return busService
}
diff --git a/server/notify/instance_subscriber.go b/server/event/instance_subscriber.go
similarity index 84%
rename from server/notify/instance_subscriber.go
rename to server/event/instance_subscriber.go
index ed295d8..f61f622 100644
--- a/server/notify/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package notify
+package event
import (
"context"
"time"
+ "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/notify"
simple "github.com/apache/servicecomb-service-center/pkg/time"
pb "github.com/go-chassis/cari/discovery"
)
@@ -33,17 +33,17 @@ const (
EventQueueSize = 5000
)
-var INSTANCE = notify.RegisterType("INSTANCE", EventQueueSize)
+var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
// 状态变化推送
type InstanceEvent struct {
- notify.Event
+ event.Event
Revision int64
Response *pb.WatchInstanceResponse
}
type InstanceEventListWatcher struct {
- notify.Subscriber
+ event.Subscriber
Job chan *InstanceEvent
ListRevision int64
ListFunc func() (results []*pb.WatchInstanceResponse, rev int64)
@@ -53,7 +53,7 @@ type InstanceEventListWatcher struct {
func (w *InstanceEventListWatcher) SetError(err error) {
w.Subscriber.SetError(err)
// 触发清理job
- e := w.Service().Publish(notify.NewErrEvent(w))
+ e := w.Bus().Fire(event.NewUnhealthyEvent(w))
if e != nil {
log.Error("", e)
}
@@ -63,7 +63,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
if w.Err() != nil {
return
}
- log.Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
+ log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
gopool.Go(w.listAndPublishJobs)
}
@@ -80,7 +80,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
}
//被通知
-func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
+func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
if w.Err() != nil {
return
}
@@ -106,7 +106,7 @@ func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
// the negative revision is specially for mongo scene,should be removed after mongo support revison.
if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
- log.Warnf("unexpected notify %s job is coming in, watcher %s %s, job is %v, current revision is %v",
+ log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
return
}
@@ -140,7 +140,7 @@ func (w *InstanceEventListWatcher) Close() {
func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: notify.NewEvent(INSTANCE, domainProject, serviceID),
+ Event: event.NewEvent(INSTANCE, domainProject, serviceID),
Revision: rev,
Response: response,
}
@@ -148,7 +148,7 @@ func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.W
func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: notify.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+ Event: event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
Revision: rev,
Response: response,
}
@@ -157,7 +157,7 @@ func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, create
func NewInstanceEventListWatcher(serviceID, domainProject string,
listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
watcher := &InstanceEventListWatcher{
- Subscriber: notify.NewSubscriber(INSTANCE, domainProject, serviceID),
+ Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
ListFunc: listFunc,
listCh: make(chan struct{}),
diff --git a/server/health/health_test.go b/server/health/health_test.go
index d99cfec..804df1e 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -16,15 +16,14 @@
package health
import (
+ "github.com/apache/servicecomb-service-center/server/alarm"
+ "github.com/apache/servicecomb-service-center/server/event"
"testing"
"time"
-
- "github.com/apache/servicecomb-service-center/server/alarm"
- "github.com/apache/servicecomb-service-center/server/notify"
)
func TestDefaultHealthChecker_Healthy(t *testing.T) {
- notify.Center().Start()
+ event.Center().Start()
// normal case
var hc DefaultHealthChecker
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index 11eed2b..aea7c97 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -18,10 +18,10 @@
package metrics
import (
+ "github.com/apache/servicecomb-service-center/pkg/event"
"time"
"github.com/apache/servicecomb-service-center/pkg/metrics"
- "github.com/apache/servicecomb-service-center/pkg/notify"
helper "github.com/apache/servicecomb-service-center/pkg/prometheus"
"github.com/prometheus/client_golang/prometheus"
)
@@ -53,7 +53,7 @@ var (
}, []string{"instance", "domain", "scheme"})
)
-func ReportPublishCompleted(evt notify.Event, err error) {
+func ReportPublishCompleted(evt event.Event, err error) {
instance := metrics.InstanceName()
elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
status := success
diff --git a/server/server.go b/server/server.go
index ac7dee7..8b6f099 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,23 +19,23 @@ package server
import (
"context"
+ "github.com/apache/servicecomb-service-center/server/event"
"net"
"os"
"strings"
"time"
"github.com/apache/servicecomb-service-center/datasource"
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/metrics"
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
"github.com/apache/servicecomb-service-center/pkg/plugin"
"github.com/apache/servicecomb-service-center/pkg/signal"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/command"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/core"
- "github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf"
"github.com/apache/servicecomb-service-center/server/service/gov"
"github.com/apache/servicecomb-service-center/server/service/rbac"
@@ -64,7 +64,7 @@ type ServiceCenterServer struct {
GRPC endpoint
apiService *APIServer
- notifyService *nf.Service
+ eventCenter *nf.BusService
syncerNotifyService *snf.Service
}
@@ -96,7 +96,7 @@ func (s *ServiceCenterServer) initialize() {
// Datasource
s.initDatasource()
s.apiService = GetAPIServer()
- s.notifyService = notify.Center()
+ s.eventCenter = event.Center()
s.syncerNotifyService = snf.GetSyncerNotifyCenter()
}
@@ -181,7 +181,7 @@ func (s *ServiceCenterServer) initSSL() {
func (s *ServiceCenterServer) startServices() {
// notifications
- s.notifyService.Start()
+ s.eventCenter.Start()
// notify syncer
syncerEnabled := config.GetBool("syncer.enabled", false)
@@ -217,8 +217,8 @@ func (s *ServiceCenterServer) Stop() {
s.apiService.Stop()
}
- if s.notifyService != nil {
- s.notifyService.Stop()
+ if s.eventCenter != nil {
+ s.eventCenter.Stop()
}
if s.syncerNotifyService != nil {