You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/09 15:12:19 UTC
[skywalking-satellite] 02/02: Aggregate registry & fix some comments
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch enhance-plugin-system
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit 8d443d14b5e73a1793bffb6c21639db9a234ed2a
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Dec 9 23:11:49 2020 +0800
Aggregate registry & fix some comments
---
internal/pkg/plugin/definition.go | 9 +++++++
internal/pkg/plugin/plugin_test.go | 2 +-
internal/pkg/plugin/registry.go | 46 ++++++++++++++++--------------------
plugins/client/api/client.go | 2 +-
plugins/collector/api/collector.go | 19 +++------------
plugins/fallbacker/api/fallbacker.go | 4 ++--
plugins/filter/api/filter.go | 14 +++--------
plugins/forwarder/api/forwarder.go | 3 +--
plugins/parser/api/parser.go | 5 +---
plugins/queue/api/queue.go | 6 ++---
10 files changed, 43 insertions(+), 67 deletions(-)
diff --git a/internal/pkg/plugin/definition.go b/internal/pkg/plugin/definition.go
index bf73689..160ba6c 100644
--- a/internal/pkg/plugin/definition.go
+++ b/internal/pkg/plugin/definition.go
@@ -17,6 +17,8 @@
package plugin
+import "reflect"
+
// Plugin defines the plugin model in Satellite.
type Plugin interface {
// Name returns the name of the specific plugin.
@@ -33,3 +35,10 @@ type InitializingFunc func(plugin Plugin, config interface{})
// CallbackFunc would be invoked after initializing.
type CallbackFunc func(plugin Plugin)
+
+type RegInfo struct {
+ PluginType reflect.Type
+ NameFinder NameFinderFunc
+ Initializing InitializingFunc
+ Callback CallbackFunc
+}
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
index a14eb3b..1030333 100644
--- a/internal/pkg/plugin/plugin_test.go
+++ b/internal/pkg/plugin/plugin_test.go
@@ -96,6 +96,6 @@ func TestPlugin(t *testing.T) {
}
func init() {
- RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem(), nil, nil, nil)
+ RegisterPluginCategory(&RegInfo{PluginType: reflect.TypeOf((*DemoCategory)(nil)).Elem()})
RegisterPlugin(&DemoPlugin{})
}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 9059d66..26cb589 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -18,6 +18,7 @@
package plugin
import (
+ "errors"
"fmt"
"reflect"
"sync"
@@ -25,18 +26,14 @@ import (
// the global plugin registry
var (
- lock sync.Mutex
- reg map[reflect.Type]map[string]reflect.Value
- initFuncReg map[reflect.Type]InitializingFunc
- callbackFuncReg map[reflect.Type]CallbackFunc
- nameFinderFuncReg map[reflect.Type]NameFinderFunc
+ lock sync.Mutex
+ reg map[reflect.Type]map[string]reflect.Value
+ meta map[reflect.Type]*RegInfo
)
func init() {
reg = make(map[reflect.Type]map[string]reflect.Value)
- initFuncReg = make(map[reflect.Type]InitializingFunc)
- callbackFuncReg = make(map[reflect.Type]CallbackFunc)
- nameFinderFuncReg = make(map[reflect.Type]NameFinderFunc)
+ meta = make(map[reflect.Type]*RegInfo)
}
// RegisterCategory register new plugin category with default InitializingFunc.
@@ -46,26 +43,23 @@ func init() {
// n: the plugin name finder,and the default value is defaultNameFinder
// i, the plugin initializer, and the default value is defaultInitializing
// c, the plugin initializer callback func, and the default value is defaultCallBack
-func RegisterPluginCategory(pluginCategory reflect.Type, n NameFinderFunc, i InitializingFunc, c CallbackFunc) {
+func RegisterPluginCategory(m *RegInfo) {
lock.Lock()
defer lock.Unlock()
- reg[pluginCategory] = map[string]reflect.Value{}
-
- if n == nil {
- nameFinderFuncReg[pluginCategory] = defaultNameFinder
- } else {
- nameFinderFuncReg[pluginCategory] = n
+ if m.PluginType == nil {
+ panic(errors.New("cannot register RegInfo because the PluginType is nil"))
+ }
+ if m.NameFinder == nil {
+ m.NameFinder = defaultNameFinder
}
- if i == nil {
- initFuncReg[pluginCategory] = defaultInitializing
- } else {
- initFuncReg[pluginCategory] = i
+ if m.Initializing == nil {
+ m.Initializing = defaultInitializing
}
- if c == nil {
- callbackFuncReg[pluginCategory] = defaultCallBack
- } else {
- callbackFuncReg[pluginCategory] = c
+ if m.Callback == nil {
+ m.Callback = defaultCallBack
}
+ reg[m.PluginType] = map[string]reflect.Value{}
+ meta[m.PluginType] = m
}
// RegisterPlugin registers the pluginType as plugin.
@@ -91,7 +85,7 @@ func RegisterPlugin(plugin Plugin) {
func Get(category reflect.Type, cfg interface{}) Plugin {
lock.Lock()
defer lock.Unlock()
- pluginName := nameFinderFuncReg[category](cfg)
+ pluginName := meta[category].NameFinder(cfg)
value, ok := reg[category][pluginName]
if !ok {
panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
@@ -102,7 +96,7 @@ func Get(category reflect.Type, cfg interface{}) Plugin {
}
plugin := reflect.New(t).Interface().(Plugin)
- initFuncReg[category](plugin, cfg)
- callbackFuncReg[category](plugin)
+ meta[category].Initializing(plugin, cfg)
+ meta[category].Callback(plugin)
return plugin
}
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 33f61a6..5b584e2 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -46,5 +46,5 @@ func GetClient(config plugin.DefaultConfig) Client {
}
func init() {
- plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem(), nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Client)(nil)).Elem()})
}
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
index a756974..1ad15b0 100644
--- a/plugins/collector/api/collector.go
+++ b/plugins/collector/api/collector.go
@@ -24,21 +24,10 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-// Init() Initial stage: Init plugin by config
-// ||
-// \/
-// Init() Preparing stage: Init the collector, such as build connection with SkyWalking javaagent.
-// ||
-// \/
-// Next() Running stage: When Collector collect a data, the data would be fetched by the upstream
-// || component through this method.
-// \/
-// Close() Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
// Collector is a plugin interface, that defines new collectors.
type Collector interface {
plugin.Plugin
- // Prepare creates a listener or reader to gather APM data.
+ // Prepare creates a listener or reader to gather APM data, such as build connection with SkyWalking javaagent.
Prepare() error
// Next return the data from the input.
EventChannel() <-chan event.SerializableEvent
@@ -46,13 +35,11 @@ type Collector interface {
Close() error
}
-var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
-
// Get collector plugin.
func GetCollector(config plugin.DefaultConfig) Collector {
- return plugin.Get(CollectorCategory, config).(Collector)
+ return plugin.Get(reflect.TypeOf((*Collector)(nil)).Elem(), config).(Collector)
}
func init() {
- plugin.RegisterPluginCategory(CollectorCategory, nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Collector)(nil)).Elem()})
}
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index 91dea45..658b8c8 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -29,7 +29,7 @@ import (
type Fallbacker interface {
plugin.Plugin
// FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
- FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc, callback DisconnectionCallback) Fallbacker
+ FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc)
}
type DisconnectionCallback func()
@@ -41,5 +41,5 @@ func GetFallbacker(config plugin.DefaultConfig) Fallbacker {
// init register the Fallbacker interface
func init() {
- plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem(), nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Fallbacker)(nil)).Elem()})
}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index bc8e9bb..f755aa2 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -24,27 +24,19 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-// Init() Initiating stage: Init plugin by config
-// ||
-// \/
-// Process() Running stage: Process the input event to convert to new event. During the processing,
-// the method should also tag event type to mark the event category.
-
// Filter is a plugin interface, that defines new pipeline filters.
type Filter interface {
plugin.Plugin
- // Process would fetch the needed event
+ // Process would put the needed event to the OutputEventContext.
Process(context *event.OutputEventContext)
}
-var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
-
// Get filter plugin.
func GetFilter(config plugin.DefaultConfig) Filter {
- return plugin.Get(FilterCategory, config).(Filter)
+ return plugin.Get(reflect.TypeOf((*Filter)(nil)).Elem(), config).(Filter)
}
func init() {
- plugin.RegisterPluginCategory(FilterCategory, nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Filter)(nil)).Elem()})
}
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index b0509cd..b869159 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -27,7 +27,6 @@ import (
// Forwarder is a plugin interface, that defines new forwarders.
type Forwarder interface {
plugin.Plugin
-
// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
Forward(connection interface{}, batch event.BatchEvents) error
// ForwardType returns the supported event type.
@@ -44,5 +43,5 @@ func GetForwarder(config map[string]interface{}) Forwarder {
// init register the Forwarder interface
func init() {
- plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem(), nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Forwarder)(nil)).Elem()})
}
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index cba3bca..5677216 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -24,9 +24,6 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-//
-// Collector ==> RawData ==> Parser ==> SerializableEvent
-//
// Parser is a plugin interface, that defines new Parsers for Collector plugin.
type Parser interface {
plugin.Plugin
@@ -43,5 +40,5 @@ func GetParser(pluginName string, config plugin.DefaultConfig) Parser {
}
func init() {
- plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem(), nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Parser)(nil)).Elem()})
}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index f6d4d69..9966326 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -56,12 +56,10 @@ type QueueConsumer interface {
Dequeue() (event event.SerializableEvent, offset int64, err error)
}
-var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
-
func GetQueue(config plugin.DefaultConfig) Queue {
- return plugin.Get(QueueCategory, config).(Queue)
+ return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
}
func init() {
- plugin.RegisterPluginCategory(QueueCategory, nil, nil, nil)
+ plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Queue)(nil)).Elem()})
}