You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/24 05:26:18 UTC
[dubbo-go] branch 3.0 updated: Refactor to remove event dispatcher
completely (#1368)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 7ed2c82 Refactor to remove event dispatcher completely (#1368)
7ed2c82 is described below
commit 7ed2c82519530b5acd042e4428a521fe4bd771ad
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Sep 24 13:25:08 2021 +0800
Refactor to remove event dispatcher completely (#1368)
* delete event dispatcher
* delete event dispatcher
* delete todo tag
* polish code
* lock listener operation
---
common/extension/event_dispatcher.go | 75 ---------
common/extension/event_dispatcher_test.go | 110 -------------
.../observer/dispatcher/direct_event_dispatcher.go | 69 --------
.../dispatcher/direct_event_dispatcher_test.go | 77 ---------
.../observer/dispatcher/mock_event_dispatcher.go | 66 --------
common/observer/event_dispatcher.go | 27 ----
common/observer/listenable.go | 142 -----------------
common/observer/listenable_test.go | 64 --------
config/config_loader.go | 13 +-
config/config_loader_options.go | 23 ---
registry/etcdv3/service_discovery.go | 63 +++++---
.../customizable_service_instance_listener.go | 72 ---------
.../customizable_service_instance_listener_test.go | 72 ---------
.../event_publishing_service_deiscovery_test.go | 176 ---------------------
.../event/event_publishing_service_discovery.go | 156 ------------------
registry/event/log_event_listener.go | 61 -------
registry/event/log_event_listener_test.go | 32 ----
.../metadata_service_url_params_customizer.go | 3 +-
.../event/protocol_ports_metadata_customizer.go | 3 +-
registry/event/service_config_exported_event.go | 44 ------
registry/event/service_discovery_event.go | 103 ------------
registry/event/service_instance_event.go | 87 ----------
registry/event/service_name_mapping_listener.go | 89 -----------
registry/event/service_revision_customizer.go | 5 +-
registry/file/service_discovery.go | 17 --
registry/nacos/service_discovery.go | 58 ++++---
registry/nacos/service_discovery_test.go | 8 -
registry/service_discovery.go | 9 --
.../servicediscovery/service_discovery_registry.go | 41 +++--
.../service_discovery_registry_test.go | 7 -
registry/zookeeper/service_discovery.go | 62 +++++---
31 files changed, 158 insertions(+), 1676 deletions(-)
diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go
deleted file mode 100644
index f2f6a6d..0000000
--- a/common/extension/event_dispatcher.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
- "sync"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-var (
- globalEventDispatcher observer.EventDispatcher
- initEventListeners []func() observer.EventListener
- initEventOnce sync.Once
-)
-
-var dispatchers = make(map[string]func() observer.EventDispatcher, 8)
-
-// SetEventDispatcher, actually, it doesn't really init the global dispatcher
-func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
- dispatchers[name] = v
-}
-
-// SetAndInitGlobalDispatcher will actually init the global dispatcher
-// if there is already a global dispatcher,
-// it will be override
-// if the dispatcher with the name not found, it will panic
-func SetAndInitGlobalDispatcher(name string) {
- if len(name) == 0 {
- name = "direct"
- }
- if globalEventDispatcher != nil {
- logger.Warnf("EventDispatcher has been initialized. It will be replaced")
- }
-
- if dp, ok := dispatchers[name]; !ok || dp == nil {
- panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " +
- "like import _ dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher.")
- }
- globalEventDispatcher = dispatchers[name]()
-}
-
-// GetGlobalDispatcher will init all listener and then return dispatcher
-func GetGlobalDispatcher() observer.EventDispatcher {
- initEventOnce.Do(func() {
- // we should delay to add the listeners to avoid some listeners left
- for _, l := range initEventListeners {
- globalEventDispatcher.AddEventListener(l())
- }
- })
- return globalEventDispatcher
-}
-
-// AddEventListener it will be added in global event dispatcher
-func AddEventListener(creator func() observer.EventListener) {
- initEventListeners = append(initEventListeners, creator)
-}
diff --git a/common/extension/event_dispatcher_test.go b/common/extension/event_dispatcher_test.go
deleted file mode 100644
index a6baa86..0000000
--- a/common/extension/event_dispatcher_test.go
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
- "reflect"
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func TestSetAndInitGlobalDispatcher(t *testing.T) {
- mock := &mockEventDispatcher{}
- SetEventDispatcher("mock", func() observer.EventDispatcher {
- return mock
- })
-
- SetAndInitGlobalDispatcher("mock")
- dispatcher := GetGlobalDispatcher()
- assert.NotNil(t, dispatcher)
- assert.Equal(t, mock, dispatcher)
-
- mock1 := &mockEventDispatcher{}
-
- SetEventDispatcher("mock1", func() observer.EventDispatcher {
- return mock1
- })
-
- SetAndInitGlobalDispatcher("mock1")
- dispatcher = GetGlobalDispatcher()
- assert.NotNil(t, dispatcher)
- assert.Equal(t, mock1, dispatcher)
-}
-
-func TestAddEventListener(t *testing.T) {
- AddEventListener(func() observer.EventListener {
- return &mockEventListener{}
- })
-
- AddEventListener(func() observer.EventListener {
- return &mockEventListener{}
- })
-
- assert.Equal(t, 2, len(initEventListeners))
-}
-
-type mockEventListener struct{}
-
-func (m mockEventListener) GetPriority() int {
- panic("implement me")
-}
-
-func (m mockEventListener) OnEvent(e observer.Event) error {
- panic("implement me")
-}
-
-func (m mockEventListener) GetEventType() reflect.Type {
- panic("implement me")
-}
-
-type mockEventDispatcher struct{}
-
-func (m mockEventDispatcher) AddEventListener(listener observer.EventListener) {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) GetAllEventListeners() []observer.EventListener {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveAllEventListeners() {
- panic("implement me")
-}
-
-func (m mockEventDispatcher) Dispatch(event observer.Event) {
- panic("implement me")
-}
diff --git a/common/observer/dispatcher/direct_event_dispatcher.go b/common/observer/dispatcher/direct_event_dispatcher.go
deleted file mode 100644
index 4fc0a10..0000000
--- a/common/observer/dispatcher/direct_event_dispatcher.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
- "reflect"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
- extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
-}
-
-// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
-// it's the top abstraction
-// Align with 2.7.5
-// Dispatcher event to listener direct
-type DirectEventDispatcher struct {
- observer.BaseListener
-}
-
-// NewDirectEventDispatcher ac constructor of DirectEventDispatcher
-func NewDirectEventDispatcher() observer.EventDispatcher {
- return &DirectEventDispatcher{
- BaseListener: observer.NewBaseListener(),
- }
-}
-
-// Dispatch event directly
-// it lookup the listener by event's type.
-// if listener not found, it just return and do nothing
-func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
- if event == nil {
- logger.Warnf("[DirectEventDispatcher] dispatch event nil")
- return
- }
- eventType := reflect.TypeOf(event).Elem()
- ded.Mutex.RLock()
- defer ded.Mutex.RUnlock()
- listenersSlice, loaded := ded.ListenersCache[eventType]
- if !loaded {
- return
- }
- for _, listener := range listenersSlice {
- if err := listener.OnEvent(event); err != nil {
- logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
- }
- }
-}
diff --git a/common/observer/dispatcher/direct_event_dispatcher_test.go b/common/observer/dispatcher/direct_event_dispatcher_test.go
deleted file mode 100644
index ba183a6..0000000
--- a/common/observer/dispatcher/direct_event_dispatcher_test.go
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
- "fmt"
- "reflect"
- "testing"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func TestDirectEventDispatcher_Dispatch(t *testing.T) {
- ded := NewDirectEventDispatcher()
- ded.AddEventListener(&TestEventListener{
- BaseListener: observer.NewBaseListener(),
- })
- ded.AddEventListener(&TestEventListener1{})
- ded.Dispatch(&TestEvent{})
- ded.Dispatch(nil)
-}
-
-type TestEvent struct {
- observer.BaseEvent
-}
-
-type TestEventListener struct {
- observer.BaseListener
- observer.EventListener
-}
-
-func (tel *TestEventListener) OnEvent(e observer.Event) error {
- fmt.Println("TestEventListener")
- return nil
-}
-
-func (tel *TestEventListener) GetPriority() int {
- return -1
-}
-
-func (tel *TestEventListener) GetEventType() reflect.Type {
- return reflect.TypeOf(&TestEvent{})
-}
-
-type TestEventListener1 struct {
- observer.EventListener
-}
-
-func (tel *TestEventListener1) OnEvent(e observer.Event) error {
- fmt.Println("TestEventListener1")
- return nil
-}
-
-func (tel *TestEventListener1) GetPriority() int {
- return 1
-}
-
-func (tel *TestEventListener1) GetEventType() reflect.Type {
- return reflect.TypeOf(TestEvent{})
-}
diff --git a/common/observer/dispatcher/mock_event_dispatcher.go b/common/observer/dispatcher/mock_event_dispatcher.go
deleted file mode 100644
index 1835448..0000000
--- a/common/observer/dispatcher/mock_event_dispatcher.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-// MockEventDispatcher will do nothing.
-// It is only used by tests
-// Now the implementation doing nothing,
-// But you can modify this if needed
-type MockEventDispatcher struct {
- Notify chan struct{}
- Event observer.Event
-}
-
-func NewMockEventDispatcher() *MockEventDispatcher {
- return &MockEventDispatcher{Notify: make(chan struct{}, 1)}
-}
-
-// AddEventListener do nothing
-func (m *MockEventDispatcher) AddEventListener(listener observer.EventListener) {
-}
-
-// AddEventListeners do nothing
-func (m *MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
-}
-
-// RemoveEventListener do nothing
-func (m *MockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
-}
-
-// RemoveEventListeners do nothing
-func (m *MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
-}
-
-// GetAllEventListeners return empty list
-func (m *MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
- return make([]observer.EventListener, 0)
-}
-
-// RemoveAllEventListeners do nothing
-func (m *MockEventDispatcher) RemoveAllEventListeners() {
-}
-
-// Dispatch do nothing
-func (m *MockEventDispatcher) Dispatch(event observer.Event) {
- m.Event = event
- m.Notify <- struct{}{}
-}
diff --git a/common/observer/event_dispatcher.go b/common/observer/event_dispatcher.go
deleted file mode 100644
index 17745e6..0000000
--- a/common/observer/event_dispatcher.go
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-// EventDispatcher is align with EventDispatcher interface in Java.
-// it's the top abstraction
-// Align with 2.7.5
-type EventDispatcher interface {
- Listenable
- // Dispatch event
- Dispatch(event Event)
-}
diff --git a/common/observer/listenable.go b/common/observer/listenable.go
deleted file mode 100644
index bf6ae72..0000000
--- a/common/observer/listenable.go
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-import (
- "reflect"
- "sort"
- "sync"
-)
-
-// Listenable could add and remove the event listener
-type Listenable interface {
- AddEventListener(listener EventListener)
- AddEventListeners(listenersSlice []EventListener)
- RemoveEventListener(listener EventListener)
- RemoveEventListeners(listenersSlice []EventListener)
- GetAllEventListeners() []EventListener
- RemoveAllEventListeners()
-}
-
-// BaseListener base listenable
-type BaseListener struct {
- Listenable
- ListenersCache map[reflect.Type][]EventListener
- Mutex sync.RWMutex
-}
-
-// NewBaseListener a constructor of base listenable
-func NewBaseListener() BaseListener {
- return BaseListener{
- ListenersCache: make(map[reflect.Type][]EventListener, 8),
- }
-}
-
-// AddEventListener add event listener
-func (bl *BaseListener) AddEventListener(listener EventListener) {
- eventType := listener.GetEventType()
- if eventType.Kind() == reflect.Ptr {
- eventType = eventType.Elem()
- }
- bl.Mutex.Lock()
- defer bl.Mutex.Unlock()
- listenersSlice, loaded := bl.ListenersCache[eventType]
- if !loaded {
- listenersSlice = make([]EventListener, 0, 8)
- }
- // return if listenersSlice already has this listener
- if loaded && containListener(listenersSlice, listener) {
- return
- }
- listenersSlice = append(listenersSlice, listener)
- sort.Slice(listenersSlice, func(i, j int) bool {
- return listenersSlice[i].GetPriority() < listenersSlice[j].GetPriority()
- })
- bl.ListenersCache[eventType] = listenersSlice
-}
-
-// AddEventListeners add the slice of event listener
-func (bl *BaseListener) AddEventListeners(listenersSlice []EventListener) {
- for _, listener := range listenersSlice {
- bl.AddEventListener(listener)
- }
-}
-
-// RemoveEventListener remove the event listener
-func (bl *BaseListener) RemoveEventListener(listener EventListener) {
- eventType := listener.GetEventType()
- if eventType.Kind() == reflect.Ptr {
- eventType = eventType.Elem()
- }
- bl.Mutex.Lock()
- defer bl.Mutex.Unlock()
- listenersSlice, loaded := bl.ListenersCache[eventType]
- if !loaded {
- return
- }
- for i, l := range listenersSlice {
- if l == listener {
- listenersSlice = append(listenersSlice[:i], listenersSlice[i+1:]...)
- }
- }
- bl.ListenersCache[eventType] = listenersSlice
-}
-
-// RemoveEventListeners remove the slice of event listener
-// it will iterate all listener and remove it one by one
-func (bl *BaseListener) RemoveEventListeners(listenersSlice []EventListener) {
- for _, listener := range listenersSlice {
- bl.RemoveEventListener(listener)
- }
-}
-
-// RemoveAllEventListeners remove all
-// using Lock
-func (bl *BaseListener) RemoveAllEventListeners() {
- bl.Mutex.Lock()
- defer bl.Mutex.Unlock()
- bl.ListenersCache = make(map[reflect.Type][]EventListener, 8)
-}
-
-// GetAllEventListeners get all listener
-// using RLock
-func (bl *BaseListener) GetAllEventListeners() []EventListener {
- allListenersSlice := make([]EventListener, 0, 16)
-
- bl.Mutex.RLock()
- defer bl.Mutex.RUnlock()
- for _, listenersSlice := range bl.ListenersCache {
- allListenersSlice = append(allListenersSlice, listenersSlice...)
- }
- sort.Slice(allListenersSlice, func(i, j int) bool {
- return allListenersSlice[i].GetPriority() < allListenersSlice[j].GetPriority()
- })
- return allListenersSlice
-}
-
-// containListener true if contain listener
-// it's not thread safe
-// usually it should be use in lock scope
-func containListener(listenersSlice []EventListener, listener EventListener) bool {
- for _, loadListener := range listenersSlice {
- if loadListener == listener {
- return true
- }
- }
- return false
-}
diff --git a/common/observer/listenable_test.go b/common/observer/listenable_test.go
deleted file mode 100644
index 3ae6e5e..0000000
--- a/common/observer/listenable_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-import (
- "reflect"
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-func TestListenable(t *testing.T) {
- el := &TestEventListener{}
- bl := NewBaseListener()
- b := &bl
- b.AddEventListener(el)
- b.AddEventListener(el)
- al := b.GetAllEventListeners()
- assert.Equal(t, len(al), 1)
- assert.Equal(t, al[0].GetEventType(), reflect.TypeOf(TestEvent{}))
- b.RemoveEventListener(el)
- assert.Equal(t, len(b.GetAllEventListeners()), 0)
- var ts []EventListener
- ts = append(ts, el)
- b.AddEventListeners(ts)
- assert.Equal(t, len(al), 1)
-}
-
-type TestEvent struct {
- BaseEvent
-}
-
-type TestEventListener struct {
- EventListener
-}
-
-func (tel *TestEventListener) OnEvent(e Event) error {
- return nil
-}
-
-func (tel *TestEventListener) GetPriority() int {
- return -1
-}
-
-func (tel *TestEventListener) GetEventType() reflect.Type {
- return reflect.TypeOf(TestEvent{})
-}
diff --git a/config/config_loader.go b/config/config_loader.go
index a87abf4..cd4f171 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -38,7 +38,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
- _ "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
"dubbo.apache.org/dubbo-go/v3/common/yaml"
"dubbo.apache.org/dubbo-go/v3/registry"
)
@@ -90,7 +89,7 @@ func DefaultInit() []LoaderInitOption {
if confRouterFile == "" {
confRouterFile = constant.DEFAULT_ROUTER_CONF_FILE_PATH
}
- return []LoaderInitOption{RouterInitOption(confRouterFile), BaseInitOption(""), ConsumerInitOption(confConFile), ProviderInitOption(confProFile)}
+ return []LoaderInitOption{RouterInitOption(confRouterFile), ConsumerInitOption(confConFile), ProviderInitOption(confProFile)}
}
// setDefaultValue set default value for providerConfig or consumerConfig if it is null
@@ -325,7 +324,7 @@ func createInstance(url *common.URL) (registry.ServiceInstance, error) {
metadata := make(map[string]string, 8)
metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
- return ®istry.DefaultServiceInstance{
+ instance := ®istry.DefaultServiceInstance{
ServiceName: appConfig.Name,
Host: host,
Port: int(port),
@@ -333,7 +332,13 @@ func createInstance(url *common.URL) (registry.ServiceInstance, error) {
Enable: true,
Healthy: true,
Metadata: metadata,
- }, nil
+ }
+
+ for _, cus := range extension.GetCustomizers() {
+ cus.Customize(instance)
+ }
+
+ return instance, nil
}
// selectMetadataServiceExportedURL get already be exported url
diff --git a/config/config_loader_options.go b/config/config_loader_options.go
index bc8aff1..f519bec 100644
--- a/config/config_loader_options.go
+++ b/config/config_loader_options.go
@@ -21,10 +21,6 @@ import (
"log"
)
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
-)
-
type LoaderInitOption interface {
init()
apply()
@@ -117,22 +113,3 @@ func RouterInitOption(crf string) LoaderInitOption {
},
}
}
-
-func BaseInitOption(cbf string) LoaderInitOption {
- return &optionFunc{
- func() {
- if cbf == "" {
- return
- }
- confBaseFile = cbf
- if err := BaseInit(cbf); err != nil {
- log.Printf("[BaseInit] %#v", err)
- baseConfig = nil
- }
- },
- func() {
- // init the global event dispatcher
- extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
- },
- }
-}
diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go
index 85679f2..a081917 100644
--- a/registry/etcdv3/service_discovery.go
+++ b/registry/etcdv3/service_discovery.go
@@ -65,7 +65,8 @@ type etcdV3ServiceDiscovery struct {
// services is when register or update will add service name
services *gxset.HashSet
// child listener
- childListenerMap map[string]*etcdv3.EventListener
+ childListenerMap map[string]*etcdv3.EventListener
+ instanceListenerMap map[string]*gxset.HashSet
}
// basic information of this instance
@@ -217,28 +218,16 @@ func (e *etcdV3ServiceDiscovery) GetRequestInstances(serviceNames []string, offs
// see addServiceInstancesChangedListener in Java
func (e *etcdV3ServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
for _, t := range listener.GetServiceNames().Values() {
- serviceName := t.(string)
- err := e.registerSreviceWatcher(serviceName)
+ err := e.registerServiceInstanceListener(t.(string), listener)
if err != nil {
return err
}
- }
- return nil
-}
-// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
-func (e *etcdV3ServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
- return e.DispatchEventForInstances(serviceName, e.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
-func (e *etcdV3ServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
- return e.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
-}
-
-// DispatchEvent dispatches the event
-func (e *etcdV3ServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- extension.GetGlobalDispatcher().Dispatch(event)
+ err = e.registerServiceWatcher(t.(string))
+ if err != nil {
+ return err
+ }
+ }
return nil
}
@@ -256,8 +245,24 @@ func toParentPath(serviceName string) string {
return ROOT + constant.PATH_SEPARATOR + serviceName
}
+// register service instance listener, instance listener and watcher are matched through serviceName
+func (e *etcdV3ServiceDiscovery) registerServiceInstanceListener(serviceName string, listener registry.ServiceInstancesChangedListener) error {
+ initLock.Lock()
+ defer initLock.Unlock()
+
+ set, found := e.instanceListenerMap[serviceName]
+ if !found {
+ set = gxset.NewSet(listener)
+ set.Add(listener)
+ e.instanceListenerMap[serviceName] = set
+ return nil
+ }
+ set.Add(listener)
+ return nil
+}
+
// register service watcher
-func (e *etcdV3ServiceDiscovery) registerSreviceWatcher(serviceName string) error {
+func (e *etcdV3ServiceDiscovery) registerServiceWatcher(serviceName string) error {
initLock.Lock()
defer initLock.Unlock()
@@ -284,7 +289,15 @@ func (e *etcdV3ServiceDiscovery) DataChange(eventType remoting.Event) bool {
instance.ServiceName = ""
}
- if err := e.DispatchEventByServiceName(instance.ServiceName); err != nil {
+ // notify instance listener instance change
+ name := instance.ServiceName
+ instances := e.GetInstances(name)
+ for _, lis := range e.instanceListenerMap[instance.ServiceName].Values() {
+ var instanceLis registry.ServiceInstancesChangedListener
+ instanceLis = lis.(registry.ServiceInstancesChangedListener)
+ err = instanceLis.OnEvent(registry.NewServiceInstancesChangedEvent(name, instances))
+ }
+ if err != nil {
return false
}
}
@@ -324,5 +337,11 @@ func newEtcdV3ServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address)
- return &etcdV3ServiceDiscovery{descriptor, client, nil, gxset.NewSet(), make(map[string]*etcdv3.EventListener)}, nil
+ return &etcdV3ServiceDiscovery{
+ descriptor,
+ client,
+ nil,
+ gxset.NewSet(),
+ make(map[string]*etcdv3.EventListener),
+ make(map[string]*gxset.HashSet)}, nil
}
diff --git a/registry/event/customizable_service_instance_listener.go b/registry/event/customizable_service_instance_listener.go
deleted file mode 100644
index e38199f..0000000
--- a/registry/event/customizable_service_instance_listener.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "reflect"
- "sync"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
- extension.AddEventListener(GetCustomizableServiceInstanceListener)
-}
-
-// customizableServiceInstanceListener is singleton
-type customizableServiceInstanceListener struct{}
-
-// GetPriority return priority 9999,
-// 9999 is big enough to make sure it will be last invoked
-func (c *customizableServiceInstanceListener) GetPriority() int {
- return 9999
-}
-
-// OnEvent if the event is ServiceInstancePreRegisteredEvent
-// it will iterate all ServiceInstanceCustomizer instances
-// or it will do nothing
-func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error {
- if preRegEvent, ok := e.(*ServiceInstancePreRegisteredEvent); ok {
- for _, cus := range extension.GetCustomizers() {
- cus.Customize(preRegEvent.serviceInstance)
- }
- }
- return nil
-}
-
-// GetEventType will return ServiceInstancePreRegisteredEvent
-func (c *customizableServiceInstanceListener) GetEventType() reflect.Type {
- return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{})
-}
-
-var (
- customizableServiceInstanceListenerInstance *customizableServiceInstanceListener
- customizableServiceInstanceListenerOnce sync.Once
-)
-
-// GetCustomizableServiceInstanceListener returns an instance
-// if the instance was not initialized, we create one
-func GetCustomizableServiceInstanceListener() observer.EventListener {
- customizableServiceInstanceListenerOnce.Do(func() {
- customizableServiceInstanceListenerInstance = &customizableServiceInstanceListener{}
- })
- return customizableServiceInstanceListenerInstance
-}
diff --git a/registry/event/customizable_service_instance_listener_test.go b/registry/event/customizable_service_instance_listener_test.go
deleted file mode 100644
index 963f0aa..0000000
--- a/registry/event/customizable_service_instance_listener_test.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "testing"
- "time"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-func TestGetCustomizableServiceInstanceListener(t *testing.T) {
-
- cus := GetCustomizableServiceInstanceListener()
-
- assert.Equal(t, 9999, cus.GetPriority())
-
- extension.AddCustomizers(&mockCustomizer{})
-
- err := cus.OnEvent(&mockEvent{})
- assert.Nil(t, err)
- err = cus.OnEvent(NewServiceInstancePreRegisteredEvent("hello", createInstance()))
- assert.Nil(t, err)
-
- tp := cus.GetEventType()
- assert.NotNil(t, tp)
-}
-
-type mockEvent struct{}
-
-func (m *mockEvent) String() string {
- panic("implement me")
-}
-
-func (m *mockEvent) GetSource() interface{} {
- panic("implement me")
-}
-
-func (m *mockEvent) GetTimestamp() time.Time {
- panic("implement me")
-}
-
-type mockCustomizer struct{}
-
-func (m *mockCustomizer) GetPriority() int {
- return 0
-}
-
-func (m *mockCustomizer) Customize(instance registry.ServiceInstance) {
-}
diff --git a/registry/event/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go
deleted file mode 100644
index 13983b1..0000000
--- a/registry/event/event_publishing_service_deiscovery_test.go
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "reflect"
- "testing"
-)
-
-import (
- gxset "github.com/dubbogo/gost/container/set"
- gxpage "github.com/dubbogo/gost/hash/page"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/suite"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
- "dubbo.apache.org/dubbo-go/v3/config"
- "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
- _ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) {
- // extension.SetMetadataService("local", local.NewMetadataService)
-
- config.GetApplicationConfig().MetadataType = "local"
-
- extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
- return mapping.NewMockServiceNameMapping()
- })
-
- dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{})
- tsd := &TestServiceDiscoveryDestroyingEventListener{
- BaseListener: observer.NewBaseListener(),
- }
- tsd.SetT(t)
- tsi := &TestServiceInstancePreRegisteredEventListener{}
- tsi.SetT(t)
- extension.AddEventListener(func() observer.EventListener {
- return tsd
- })
- extension.AddEventListener(func() observer.EventListener {
- return tsi
- })
- extension.SetEventDispatcher("direct", dispatcher.NewDirectEventDispatcher)
- extension.SetAndInitGlobalDispatcher("direct")
- err := dc.Destroy()
- assert.Nil(t, err)
- si := ®istry.DefaultServiceInstance{ID: "testServiceInstance"}
- err = dc.Register(si)
- assert.Nil(t, err)
-}
-
-type TestServiceDiscoveryDestroyingEventListener struct {
- suite.Suite
- observer.BaseListener
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e observer.Event) error {
- e1, ok := e.(*ServiceDiscoveryDestroyingEvent)
- assert.Equal(tel.T(), ok, true)
- assert.Equal(tel.T(), "testServiceDiscovery", e1.GetOriginal().String())
- assert.Equal(tel.T(), "testServiceDiscovery", e1.GetServiceDiscovery().String())
- return nil
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) GetPriority() int {
- return -1
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() reflect.Type {
- return reflect.TypeOf(ServiceDiscoveryDestroyingEvent{})
-}
-
-type TestServiceInstancePreRegisteredEventListener struct {
- suite.Suite
- observer.BaseListener
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e observer.Event) error {
- e1, ok := e.(*ServiceInstancePreRegisteredEvent)
- assert.Equal(tel.T(), ok, true)
- assert.Equal(tel.T(), "testServiceInstance", e1.getServiceInstance().GetID())
- return nil
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) GetPriority() int {
- return -1
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect.Type {
- return reflect.TypeOf(ServiceInstancePreRegisteredEvent{})
-}
-
-type ServiceDiscoveryA struct{}
-
-// String return mockServiceDiscovery
-func (msd *ServiceDiscoveryA) String() string {
- return "testServiceDiscovery"
-}
-
-// Destroy do nothing
-func (msd *ServiceDiscoveryA) Destroy() error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) Register(instance registry.ServiceInstance) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) Update(instance registry.ServiceInstance) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) Unregister(instance registry.ServiceInstance) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) GetDefaultPageSize() int {
- return 1
-}
-
-func (msd *ServiceDiscoveryA) GetServices() *gxset.HashSet {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) GetInstances(serviceName string) []registry.ServiceInstance {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) AddListener(listener registry.ServiceInstancesChangedListener) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEventByServiceName(serviceName string) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
- return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- return nil
-}
diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go
deleted file mode 100644
index b360dab..0000000
--- a/registry/event/event_publishing_service_discovery.go
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- gxset "github.com/dubbogo/gost/container/set"
- gxpage "github.com/dubbogo/gost/hash/page"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/metadata/service"
- "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// EventPublishingServiceDiscovery will enhance Service Discovery
-// Publish some event about service discovery
-type EventPublishingServiceDiscovery struct {
- serviceDiscovery registry.ServiceDiscovery
-}
-
-// NewEventPublishingServiceDiscovery is a constructor
-func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscovery) *EventPublishingServiceDiscovery {
- return &EventPublishingServiceDiscovery{
- serviceDiscovery: serviceDiscovery,
- }
-}
-
-// String returns serviceDiscovery.String()
-func (epsd *EventPublishingServiceDiscovery) String() string {
- return epsd.serviceDiscovery.String()
-}
-
-// Destroy delegate function
-func (epsd *EventPublishingServiceDiscovery) Destroy() error {
- f := func() error {
- return epsd.serviceDiscovery.Destroy()
- }
- return epsd.executeWithEvents(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery),
- f, NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery))
-}
-
-// Register delegate function
-func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error {
- f := func() error {
- return epsd.serviceDiscovery.Register(instance)
- }
- return epsd.executeWithEvents(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance),
- f, NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance))
-}
-
-// Update returns the result of serviceDiscovery.Update
-func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error {
- f := func() error {
- return epsd.serviceDiscovery.Update(instance)
- }
- return epsd.executeWithEvents(nil, f, nil)
-}
-
-// Unregister unregister the instance and drop ServiceInstancePreUnregisteredEvent and ServiceInstanceUnregisteredEvent
-func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
- f := func() error {
- return epsd.serviceDiscovery.Unregister(instance)
- }
- return epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance),
- f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance))
-}
-
-// GetDefaultPageSize returns the result of serviceDiscovery.GetDefaultPageSize
-func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int {
- return epsd.serviceDiscovery.GetDefaultPageSize()
-}
-
-// GetServices returns the result of serviceDiscovery.GetServices
-func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet {
- return epsd.serviceDiscovery.GetServices()
-}
-
-// GetInstances returns the result of serviceDiscovery.GetInstances
-func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
- return epsd.serviceDiscovery.GetInstances(serviceName)
-}
-
-// GetInstancesByPage returns the result of serviceDiscovery.GetInstancesByPage
-func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
- return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, pageSize)
-}
-
-// GetHealthyInstancesByPage returns the result of serviceDiscovery.GetHealthyInstancesByPage
-func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
- return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, offset, pageSize, healthy)
-}
-
-// GetRequestInstances returns result from serviceDiscovery.GetRequestInstances
-func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
- return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize)
-}
-
-// AddListener add event listener
-func (epsd *EventPublishingServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
- extension.GetGlobalDispatcher().AddEventListener(listener)
- return epsd.serviceDiscovery.AddListener(listener)
-}
-
-// DispatchEventByServiceName pass serviceName to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
- return epsd.serviceDiscovery.DispatchEventByServiceName(serviceName)
-}
-
-// DispatchEventForInstances pass params to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
- return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, instances)
-}
-
-// DispatchEvent pass the event to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- return epsd.serviceDiscovery.DispatchEvent(event)
-}
-
-// executeWithEvents dispatch before event and after event if return error will dispatch exception event
-func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent observer.Event, f func() error, afterEvent observer.Event) error {
- globalDispatcher := extension.GetGlobalDispatcher()
- if beforeEvent != nil {
- globalDispatcher.Dispatch(beforeEvent)
- }
- if err := f(); err != nil {
- globalDispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err))
- return err
- }
- if afterEvent != nil {
- globalDispatcher.Dispatch(afterEvent)
- }
- return nil
-}
-
-// getMetadataService returns metadata service instance
-func getMetadataService() (service.MetadataService, error) {
- return local.GetLocalMetadataService()
-}
diff --git a/registry/event/log_event_listener.go b/registry/event/log_event_listener.go
deleted file mode 100644
index 9ccfcdc..0000000
--- a/registry/event/log_event_listener.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "reflect"
- "sync"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
- extension.AddEventListener(GetLogEventListener)
-}
-
-// logEventListener is singleton
-type logEventListener struct{}
-
-func (l *logEventListener) GetPriority() int {
- return 0
-}
-
-func (l *logEventListener) OnEvent(e observer.Event) error {
- logger.Info("Event happen: " + e.String())
- return nil
-}
-
-func (l *logEventListener) GetEventType() reflect.Type {
- return reflect.TypeOf(&observer.BaseEvent{})
-}
-
-var (
- logEventListenerInstance *logEventListener
- logEventListenerOnce sync.Once
-)
-
-func GetLogEventListener() observer.EventListener {
- logEventListenerOnce.Do(func() {
- logEventListenerInstance = &logEventListener{}
- })
- return logEventListenerInstance
-}
diff --git a/registry/event/log_event_listener_test.go b/registry/event/log_event_listener_test.go
deleted file mode 100644
index 3136564..0000000
--- a/registry/event/log_event_listener_test.go
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-func TestLogEventListener(t *testing.T) {
- l := &logEventListener{}
- assert.Equal(t, 0, l.GetPriority())
- assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{}))
-}
diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go
index bfe7fb7..53e79b4 100644
--- a/registry/event/metadata_service_url_params_customizer.go
+++ b/registry/event/metadata_service_url_params_customizer.go
@@ -30,6 +30,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
"dubbo.apache.org/dubbo-go/v3/registry"
)
@@ -54,7 +55,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer) GetPriority() int {
}
func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
- ms, err := getMetadataService()
+ ms, err := local.GetLocalMetadataService()
if err != nil {
logger.Errorf("could not find the metadata service", err)
return
diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go
index 5a6e9ca..058c0b9 100644
--- a/registry/event/protocol_ports_metadata_customizer.go
+++ b/registry/event/protocol_ports_metadata_customizer.go
@@ -26,6 +26,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
"dubbo.apache.org/dubbo-go/v3/registry"
)
@@ -44,7 +45,7 @@ func (p *ProtocolPortsMetadataCustomizer) GetPriority() int {
// Customize put the the string like [{"protocol": "dubbo", "port": 123}] into instance's metadata
func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
- metadataService, err := getMetadataService()
+ metadataService, err := local.GetLocalMetadataService()
if err != nil {
logger.Errorf("Could not init the MetadataService", err)
return
diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go
deleted file mode 100644
index af566e3..0000000
--- a/registry/event/service_config_exported_event.go
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "time"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/config"
-)
-
-// ServiceConfigExportedEvent represents an service was exported
-type ServiceConfigExportedEvent struct {
- observer.BaseEvent
- ServiceConfig *config.ServiceConfig
-}
-
-// NewServiceConfigExportedEvent create an instance
-func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent {
- return &ServiceConfigExportedEvent{
- BaseEvent: observer.BaseEvent{
- Source: serviceConfig,
- Timestamp: time.Now(),
- },
- ServiceConfig: serviceConfig,
- }
-}
diff --git a/registry/event/service_discovery_event.go b/registry/event/service_discovery_event.go
deleted file mode 100644
index 8235294..0000000
--- a/registry/event/service_discovery_event.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// ServiceDiscoveryEvent means that something happens to service discovery instance
-type ServiceDiscoveryEvent struct {
- observer.BaseEvent
- original registry.ServiceDiscovery
-}
-
-// NewServiceDiscoveryEvent returns an instance
-func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent {
- return &ServiceDiscoveryEvent{
- BaseEvent: *observer.NewBaseEvent(discovery),
- original: original,
- }
-}
-
-// GetServiceDiscovery returns the event source
-func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery {
- return sde.GetSource().(registry.ServiceDiscovery)
-}
-
-// GetOriginal actually I think we can remove this method.
-func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery {
- return sde.original
-}
-
-// ServiceDiscoveryDestroyingEvent
-// this event will be dispatched before service discovery be destroyed
-type ServiceDiscoveryDestroyingEvent struct {
- ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryExceptionEvent
-// this event will be dispatched when the error occur in service discovery
-type ServiceDiscoveryExceptionEvent struct {
- ServiceDiscoveryEvent
- err error
-}
-
-// ServiceDiscoveryInitializedEvent
-// this event will be dispatched after service discovery initialize
-type ServiceDiscoveryInitializedEvent struct {
- ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryInitializingEvent
-// this event will be dispatched before service discovery initialize
-type ServiceDiscoveryInitializingEvent struct {
- ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryDestroyedEvent
-// this event will be dispatched after service discovery be destroyed
-type ServiceDiscoveryDestroyedEvent struct {
- ServiceDiscoveryEvent
-}
-
-// NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent
-func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent {
- return &ServiceDiscoveryDestroyingEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
-
-// NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent
-func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent {
- return &ServiceDiscoveryExceptionEvent{*NewServiceDiscoveryEvent(discovery, original), err}
-}
-
-// NewServiceDiscoveryInitializedEvent create a ServiceDiscoveryInitializedEvent
-func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent {
- return &ServiceDiscoveryInitializedEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
-
-// NewServiceDiscoveryInitializingEvent create a ServiceDiscoveryInitializingEvent
-func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent {
- return &ServiceDiscoveryInitializingEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
-
-// NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent
-func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent {
- return &ServiceDiscoveryDestroyedEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
diff --git a/registry/event/service_instance_event.go b/registry/event/service_instance_event.go
deleted file mode 100644
index bac8029..0000000
--- a/registry/event/service_instance_event.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// ServiceInstanceEvent means something happen to this ServiceInstance
-// like register this service instance
-type ServiceInstanceEvent struct {
- observer.BaseEvent
- serviceInstance registry.ServiceInstance
-}
-
-// NewServiceInstanceEvent create a ServiceInstanceEvent
-func NewServiceInstanceEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceEvent {
- return &ServiceInstanceEvent{
- BaseEvent: *observer.NewBaseEvent(source),
- serviceInstance: instance,
- }
-}
-
-// getServiceInstance return the service instance
-func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance {
- return sie.serviceInstance
-}
-
-// ServiceInstancePreRegisteredEvent
-// this event will be dispatched before service instance be registered
-type ServiceInstancePreRegisteredEvent struct {
- ServiceInstanceEvent
-}
-
-// ServiceInstancePreUnregisteredEvent
-// this event will be dispatched before service instance be unregistered
-type ServiceInstancePreUnregisteredEvent struct {
- ServiceInstanceEvent
-}
-
-// ServiceInstanceRegisteredEvent
-// this event will be dispatched after service instance be registered
-type ServiceInstanceRegisteredEvent struct {
- ServiceInstanceEvent
-}
-
-// ServiceInstanceRegisteredEvent
-// this event will be dispatched after service instance be unregistered
-type ServiceInstanceUnregisteredEvent struct {
- ServiceInstanceEvent
-}
-
-// NewServiceInstancePreRegisteredEvent create a ServiceInstancePreRegisteredEvent
-func NewServiceInstancePreRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreRegisteredEvent {
- return &ServiceInstancePreRegisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
-
-// NewServiceInstancePreUnregisteredEvent create a ServiceInstancePreUnregisteredEvent
-func NewServiceInstancePreUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent {
- return &ServiceInstancePreUnregisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
-
-// NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent
-func NewServiceInstanceRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceRegisteredEvent {
- return &ServiceInstanceRegisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
-
-// NewServiceInstanceUnregisteredEvent create a ServiceInstanceUnregisteredEvent
-func NewServiceInstanceUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceUnregisteredEvent {
- return &ServiceInstanceUnregisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go
deleted file mode 100644
index e7d979a..0000000
--- a/registry/event/service_name_mapping_listener.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
- "reflect"
- "sync"
-)
-
-import (
- perrors "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
-)
-
-func init() {
- extension.AddEventListener(GetServiceNameMappingListener)
-}
-
-// serviceNameMappingListener listen to service name mapping event
-// usually it means that we exported some service
-// it's a singleton
-type serviceNameMappingListener struct {
- nameMapping mapping.ServiceNameMapping
-}
-
-// GetPriority return 3, which ensure that this listener will be invoked after log listener
-func (s *serviceNameMappingListener) GetPriority() int {
- return 3
-}
-
-// OnEvent only handle ServiceConfigExportedEvent
-func (s *serviceNameMappingListener) OnEvent(e observer.Event) error {
- if ex, ok := e.(*ServiceConfigExportedEvent); ok {
- sc := ex.ServiceConfig
- urls := sc.GetExportedUrls()
-
- for _, u := range urls {
- err := s.nameMapping.Map(u.GetParam(constant.INTERFACE_KEY, ""),
- u.GetParam(constant.GROUP_KEY, ""),
- u.GetParam(constant.Version, ""),
- u.Protocol)
- if err != nil {
- return perrors.WithMessage(err, "could not map the service: "+u.String())
- }
- }
- }
- return nil
-}
-
-// GetEventType return ServiceConfigExportedEvent
-func (s *serviceNameMappingListener) GetEventType() reflect.Type {
- return reflect.TypeOf(&ServiceConfigExportedEvent{})
-}
-
-var (
- serviceNameMappingListenerInstance *serviceNameMappingListener
- serviceNameMappingListenerOnce sync.Once
-)
-
-// GetServiceNameMappingListener returns an instance
-func GetServiceNameMappingListener() observer.EventListener {
- serviceNameMappingListenerOnce.Do(func() {
- serviceNameMappingListenerInstance = &serviceNameMappingListener{
- nameMapping: extension.GetGlobalServiceNameMapping(),
- }
- })
- return serviceNameMappingListenerInstance
-}
diff --git a/registry/event/service_revision_customizer.go b/registry/event/service_revision_customizer.go
index 3acf655..77e022c 100644
--- a/registry/event/service_revision_customizer.go
+++ b/registry/event/service_revision_customizer.go
@@ -28,6 +28,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
"dubbo.apache.org/dubbo-go/v3/registry"
)
@@ -47,7 +48,7 @@ func (e *exportedServicesRevisionMetadataCustomizer) GetPriority() int {
// Customize calculate the revision for exported urls and then put it into instance metadata
func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) {
- ms, err := getMetadataService()
+ ms, err := local.GetLocalMetadataService()
if err != nil {
logger.Errorf("could not get metadata service", err)
return
@@ -74,7 +75,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) GetPriority() int {
// Customize calculate the revision for subscribed urls and then put it into instance metadata
func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) {
- ms, err := getMetadataService()
+ ms, err := local.GetLocalMetadataService()
if err != nil {
logger.Errorf("could not get metadata service", err)
return
diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go
index b6c7a31..015a45e 100644
--- a/registry/file/service_discovery.go
+++ b/registry/file/service_discovery.go
@@ -268,20 +268,3 @@ func (fssd *fileSystemServiceDiscovery) AddListener(listener registry.ServiceIns
// fssd.dynamicConfiguration.AddListener(listener.ServiceName)
return nil
}
-
-// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
-func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
- return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, fssd.GetInstances(serviceName)))
-}
-
-// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
-func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string,
- instances []registry.ServiceInstance) error {
- return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
-}
-
-// DispatchEvent dispatches the event
-func (fssd *fileSystemServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- extension.GetGlobalDispatcher().Dispatch(event)
- return nil
-}
diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go
index 57d4bec..2328b4b 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -62,6 +62,9 @@ type nacosServiceDiscovery struct {
namingClient *nacosClient.NacosNamingClient
// cache registry instances
registryInstances []registry.ServiceInstance
+
+ instanceListenerMap map[string]*gxset.HashSet
+ listenerLock sync.Mutex
}
// Destroy will close the service discovery.
@@ -210,8 +213,30 @@ func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offse
return res
}
+func (n *nacosServiceDiscovery) registerInstanceListener(listener registry.ServiceInstancesChangedListener) {
+ n.listenerLock.Lock()
+ defer n.listenerLock.Unlock()
+
+ for _, t := range listener.GetServiceNames().Values() {
+ serviceName, ok := t.(string)
+ if !ok {
+ logger.Errorf("service name error %s", t)
+ continue
+ }
+ listenerSet, found := n.instanceListenerMap[serviceName]
+ if !found {
+ listenerSet = gxset.NewSet(listener)
+ listenerSet.Add(listener)
+ n.instanceListenerMap[serviceName] = listenerSet
+ } else {
+ listenerSet.Add(listener)
+ }
+ }
+}
+
// AddListener will add a listener
func (n *nacosServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
+ n.registerInstanceListener(listener)
for _, t := range listener.GetServiceNames().Values() {
serviceName := t.(string)
err := n.namingClient.Client().Subscribe(&vo.SubscribeParam{
@@ -241,7 +266,13 @@ func (n *nacosServiceDiscovery) AddListener(listener registry.ServiceInstancesCh
})
}
- e := n.DispatchEventForInstances(serviceName, instances)
+ var e error
+ for _, lis := range n.instanceListenerMap[serviceName].Values() {
+ var instanceListener registry.ServiceInstancesChangedListener
+ instanceListener = lis.(registry.ServiceInstancesChangedListener)
+ e = instanceListener.OnEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+ }
+
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", serviceName, err)
}
@@ -254,22 +285,6 @@ func (n *nacosServiceDiscovery) AddListener(listener registry.ServiceInstancesCh
return nil
}
-// DispatchEventByServiceName will dispatch the event for the service with the service name
-func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
- return n.DispatchEventForInstances(serviceName, n.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances will dispatch the event to those instances
-func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
- return n.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
-}
-
-// DispatchEvent will dispatch the event
-func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- extension.GetGlobalDispatcher().Dispatch(event)
- return nil
-}
-
// toRegisterInstance convert the ServiceInstance to RegisterInstanceParam
// the Ephemeral will be true
func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInstance) vo.RegisterInstanceParam {
@@ -347,10 +362,11 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", rc.Address)
newInstance := &nacosServiceDiscovery{
- group: group,
- namingClient: client,
- descriptor: descriptor,
- registryInstances: []registry.ServiceInstance{},
+ group: group,
+ namingClient: client,
+ descriptor: descriptor,
+ registryInstances: []registry.ServiceInstance{},
+ instanceListenerMap: make(map[string]*gxset.HashSet),
}
instanceMap[name] = newInstance
return newInstance, nil
diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go
index 03b1b9c..1c438c5 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -32,8 +32,6 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -75,15 +73,11 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
return
}
prepareData()
- extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
- return dispatcher.NewMockEventDispatcher()
- })
extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
return mapping.NewMockServiceNameMapping()
})
- extension.SetAndInitGlobalDispatcher("mock")
rand.Seed(time.Now().Unix())
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
id := "id"
@@ -153,8 +147,6 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Equal(t, "b", v)
// test dispatcher event
- err = serviceDiscovery.DispatchEventByServiceName(serviceName)
- assert.Nil(t, err)
hs := gxset.NewSet()
hs.Add(serviceName)
// test AddListener
diff --git a/registry/service_discovery.go b/registry/service_discovery.go
index 70b7b76..c90ff11 100644
--- a/registry/service_discovery.go
+++ b/registry/service_discovery.go
@@ -75,13 +75,4 @@ type ServiceDiscovery interface {
// AddListener adds a new ServiceInstancesChangedListenerImpl
// see addServiceInstancesChangedListener in Java
AddListener(listener ServiceInstancesChangedListener) error
-
- // DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
- DispatchEventByServiceName(serviceName string) error
-
- // DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances
- DispatchEventForInstances(serviceName string, instances []ServiceInstance) error
-
- // DispatchEvent dispatches the event
- DispatchEvent(event *ServiceInstancesChangedEvent) error
}
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index 1feb8f1..40a0bf3 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -134,7 +134,7 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
if err != nil {
return nil, perrors.WithMessage(err, "Create service discovery fialed")
}
- return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+ return originServiceDiscovery, nil
}
func parseServices(literalServices string) *gxset.HashSet {
@@ -235,20 +235,12 @@ func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No
}
s.serviceListeners[serviceNamesKey] = listener
listener.AddListenerAndNotify(protocolServiceKey, notify)
- s.registerServiceInstancesChangedListener(url, listener)
- return nil
-}
-func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url *common.URL, listener registry.ServiceInstancesChangedListener) {
- // FIXME ServiceNames.String() is not good
- listenerId := listener.GetServiceNames().String() + ":" + getUrlKey(url)
- if !s.subscribedServices.Contains(listenerId) {
- err := s.serviceDiscovery.AddListener(listener)
- if err != nil {
- logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
- }
+ err = s.serviceDiscovery.AddListener(listener)
+ if err != nil {
+ logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error())
}
-
+ return nil
}
func getUrlKey(url *common.URL) string {
@@ -352,5 +344,26 @@ func tryInitMetadataService(url *common.URL) {
if err != nil {
logger.Errorf("could not export the metadata service", err)
}
- extension.GetGlobalDispatcher().Dispatch(event.NewServiceConfigExportedEvent(expt.(*configurable.MetadataServiceExporter).ServiceConfig))
+
+ // report interface-app mapping
+ err = publishMapping(expt.(*configurable.MetadataServiceExporter).ServiceConfig)
+ if err != nil {
+ logger.Errorf("Publish interface-application mapping failed", err)
+ }
+}
+
+// OnEvent only handle ServiceConfigExportedEvent
+func publishMapping(sc *config.ServiceConfig) error {
+ urls := sc.GetExportedUrls()
+
+ for _, u := range urls {
+ err := extension.GetGlobalServiceNameMapping().Map(u.GetParam(constant.INTERFACE_KEY, ""),
+ u.GetParam(constant.GROUP_KEY, ""),
+ u.GetParam(constant.Version, ""),
+ u.Protocol)
+ if err != nil {
+ return perrors.WithMessage(err, "could not map the service: "+u.String())
+ }
+ }
+ return nil
}
diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go
index 8c92642..d8b8445 100644
--- a/registry/servicediscovery/service_discovery_registry_test.go
+++ b/registry/servicediscovery/service_discovery_registry_test.go
@@ -30,8 +30,6 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/observer"
- "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/service"
@@ -59,11 +57,6 @@ func TestServiceDiscoveryRegistry_Register(t *testing.T) {
return mapping.NewMockServiceNameMapping()
})
- extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
- return dispatcher.NewMockEventDispatcher()
- })
- extension.SetAndInitGlobalDispatcher("mock")
-
config.GetBaseConfig().ServiceDiscoveries["mock"] = &config.ServiceDiscoveryConfig{
Protocol: "mock",
}
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index 15cf134..b1909ae 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -64,13 +64,14 @@ type zookeeperServiceDiscovery struct {
client *gxzookeeper.ZookeeperClient
csd *curator_discovery.ServiceDiscovery
// listener *zookeeper.ZkEventListener
- url *common.URL
- wg sync.WaitGroup
- cltLock sync.Mutex
- listenLock sync.Mutex
- done chan struct{}
- rootPath string
- listenNames []string
+ url *common.URL
+ wg sync.WaitGroup
+ cltLock sync.Mutex
+ listenLock sync.Mutex
+ done chan struct{}
+ rootPath string
+ listenNames []string
+ instanceListenerMap map[string]*gxset.HashSet
}
// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
@@ -105,8 +106,9 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error
common.WithParamsValue(constant.CONFIG_TIMEOUT_KEY, remoteConfig.TimeoutStr))
url.Location = remoteConfig.Address
zksd := &zookeeperServiceDiscovery{
- url: url,
- rootPath: rootPath,
+ url: url,
+ rootPath: rootPath,
+ instanceListenerMap: make(map[string]*gxset.HashSet),
}
err := zookeeper.ValidateZookeeperClient(zksd, url.Location)
if err != nil {
@@ -272,6 +274,7 @@ func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string
func (zksd *zookeeperServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
zksd.listenLock.Lock()
defer zksd.listenLock.Unlock()
+
for _, t := range listener.GetServiceNames().Values() {
serviceName, ok := t.(string)
if !ok {
@@ -279,23 +282,24 @@ func (zksd *zookeeperServiceDiscovery) AddListener(listener registry.ServiceInst
continue
}
zksd.listenNames = append(zksd.listenNames, serviceName)
- zksd.csd.ListenServiceEvent(serviceName, zksd)
+ listenerSet, found := zksd.instanceListenerMap[serviceName]
+ if !found {
+ listenerSet = gxset.NewSet(listener)
+ listenerSet.Add(listener)
+ zksd.instanceListenerMap[serviceName] = listenerSet
+ } else {
+ listenerSet.Add(listener)
+ }
}
- return nil
-}
-func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
- return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
-func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
- return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
-}
-
-// nolint
-func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
- extension.GetGlobalDispatcher().Dispatch(event)
+ for _, t := range listener.GetServiceNames().Values() {
+ serviceName, ok := t.(string)
+ if !ok {
+ logger.Errorf("service name error %s", t)
+ continue
+ }
+ zksd.csd.ListenServiceEvent(serviceName, zksd)
+ }
return nil
}
@@ -306,7 +310,15 @@ func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool
path = strings.TrimPrefix(path, constant.PATH_SEPARATOR)
// get service name in zk path
serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
- err := zksd.DispatchEventByServiceName(serviceName)
+
+ var err error
+ instances := zksd.GetInstances(serviceName)
+ for _, lis := range zksd.instanceListenerMap[serviceName].Values() {
+ var instanceListener registry.ServiceInstancesChangedListener
+ instanceListener = lis.(registry.ServiceInstancesChangedListener)
+ err = instanceListener.OnEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
+ }
+
if err != nil {
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
return false