You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/09 15:12:19 UTC

[skywalking-satellite] 02/02: Aggregate registry & fix some comments

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch enhance-plugin-system
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit 8d443d14b5e73a1793bffb6c21639db9a234ed2a
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Dec 9 23:11:49 2020 +0800

    Aggregate registry & fix some comments
---
 internal/pkg/plugin/definition.go    |  9 +++++++
 internal/pkg/plugin/plugin_test.go   |  2 +-
 internal/pkg/plugin/registry.go      | 46 ++++++++++++++++--------------------
 plugins/client/api/client.go         |  2 +-
 plugins/collector/api/collector.go   | 19 +++------------
 plugins/fallbacker/api/fallbacker.go |  4 ++--
 plugins/filter/api/filter.go         | 14 +++--------
 plugins/forwarder/api/forwarder.go   |  3 +--
 plugins/parser/api/parser.go         |  5 +---
 plugins/queue/api/queue.go           |  6 ++---
 10 files changed, 43 insertions(+), 67 deletions(-)

diff --git a/internal/pkg/plugin/definition.go b/internal/pkg/plugin/definition.go
index bf73689..160ba6c 100644
--- a/internal/pkg/plugin/definition.go
+++ b/internal/pkg/plugin/definition.go
@@ -17,6 +17,8 @@
 
 package plugin
 
+import "reflect"
+
 // Plugin defines the plugin model in Satellite.
 type Plugin interface {
 	// Name returns the name of the specific plugin.
@@ -33,3 +35,10 @@ type InitializingFunc func(plugin Plugin, config interface{})
 
 // CallbackFunc would be invoked after initializing.
 type CallbackFunc func(plugin Plugin)
+
+type RegInfo struct {
+	PluginType   reflect.Type
+	NameFinder   NameFinderFunc
+	Initializing InitializingFunc
+	Callback     CallbackFunc
+}
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
index a14eb3b..1030333 100644
--- a/internal/pkg/plugin/plugin_test.go
+++ b/internal/pkg/plugin/plugin_test.go
@@ -96,6 +96,6 @@ func TestPlugin(t *testing.T) {
 }
 
 func init() {
-	RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem(), nil, nil, nil)
+	RegisterPluginCategory(&RegInfo{PluginType: reflect.TypeOf((*DemoCategory)(nil)).Elem()})
 	RegisterPlugin(&DemoPlugin{})
 }
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 9059d66..26cb589 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -18,6 +18,7 @@
 package plugin
 
 import (
+	"errors"
 	"fmt"
 	"reflect"
 	"sync"
@@ -25,18 +26,14 @@ import (
 
 // the global plugin registry
 var (
-	lock              sync.Mutex
-	reg               map[reflect.Type]map[string]reflect.Value
-	initFuncReg       map[reflect.Type]InitializingFunc
-	callbackFuncReg   map[reflect.Type]CallbackFunc
-	nameFinderFuncReg map[reflect.Type]NameFinderFunc
+	lock sync.Mutex
+	reg  map[reflect.Type]map[string]reflect.Value
+	meta map[reflect.Type]*RegInfo
 )
 
 func init() {
 	reg = make(map[reflect.Type]map[string]reflect.Value)
-	initFuncReg = make(map[reflect.Type]InitializingFunc)
-	callbackFuncReg = make(map[reflect.Type]CallbackFunc)
-	nameFinderFuncReg = make(map[reflect.Type]NameFinderFunc)
+	meta = make(map[reflect.Type]*RegInfo)
 }
 
 // RegisterCategory register new plugin category with default InitializingFunc.
@@ -46,26 +43,23 @@ func init() {
 // n: the plugin name finder,and the default value is defaultNameFinder
 // i, the plugin initializer, and the default value is defaultInitializing
 // c, the plugin initializer callback func, and the default value is defaultCallBack
-func RegisterPluginCategory(pluginCategory reflect.Type, n NameFinderFunc, i InitializingFunc, c CallbackFunc) {
+func RegisterPluginCategory(m *RegInfo) {
 	lock.Lock()
 	defer lock.Unlock()
-	reg[pluginCategory] = map[string]reflect.Value{}
-
-	if n == nil {
-		nameFinderFuncReg[pluginCategory] = defaultNameFinder
-	} else {
-		nameFinderFuncReg[pluginCategory] = n
+	if m.PluginType == nil {
+		panic(errors.New("cannot register RegInfo because the PluginType is nil"))
+	}
+	if m.NameFinder == nil {
+		m.NameFinder = defaultNameFinder
 	}
-	if i == nil {
-		initFuncReg[pluginCategory] = defaultInitializing
-	} else {
-		initFuncReg[pluginCategory] = i
+	if m.Initializing == nil {
+		m.Initializing = defaultInitializing
 	}
-	if c == nil {
-		callbackFuncReg[pluginCategory] = defaultCallBack
-	} else {
-		callbackFuncReg[pluginCategory] = c
+	if m.Callback == nil {
+		m.Callback = defaultCallBack
 	}
+	reg[m.PluginType] = map[string]reflect.Value{}
+	meta[m.PluginType] = m
 }
 
 // RegisterPlugin registers the pluginType as plugin.
@@ -91,7 +85,7 @@ func RegisterPlugin(plugin Plugin) {
 func Get(category reflect.Type, cfg interface{}) Plugin {
 	lock.Lock()
 	defer lock.Unlock()
-	pluginName := nameFinderFuncReg[category](cfg)
+	pluginName := meta[category].NameFinder(cfg)
 	value, ok := reg[category][pluginName]
 	if !ok {
 		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
@@ -102,7 +96,7 @@ func Get(category reflect.Type, cfg interface{}) Plugin {
 	}
 
 	plugin := reflect.New(t).Interface().(Plugin)
-	initFuncReg[category](plugin, cfg)
-	callbackFuncReg[category](plugin)
+	meta[category].Initializing(plugin, cfg)
+	meta[category].Callback(plugin)
 	return plugin
 }
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 33f61a6..5b584e2 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -46,5 +46,5 @@ func GetClient(config plugin.DefaultConfig) Client {
 }
 
 func init() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem(), nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Client)(nil)).Elem()})
 }
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
index a756974..1ad15b0 100644
--- a/plugins/collector/api/collector.go
+++ b/plugins/collector/api/collector.go
@@ -24,21 +24,10 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()     Initial stage: Init plugin by config
-//    ||
-//    \/
-//   Init()   Preparing stage: Init the collector, such as build connection with SkyWalking javaagent.
-//    ||
-//    \/
-//   Next()     Running stage: When Collector collect a data, the data would be fetched by the upstream
-//    ||                       component through this method.
-//    \/
-//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
 // Collector is a plugin interface, that defines new collectors.
 type Collector interface {
 	plugin.Plugin
-	// Prepare creates a listener or reader to gather APM data.
+	// Prepare creates a listener or reader to gather APM data, such as build connection with SkyWalking javaagent.
 	Prepare() error
 	// Next return the data from the input.
 	EventChannel() <-chan event.SerializableEvent
@@ -46,13 +35,11 @@ type Collector interface {
 	Close() error
 }
 
-var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
-
 // Get collector plugin.
 func GetCollector(config plugin.DefaultConfig) Collector {
-	return plugin.Get(CollectorCategory, config).(Collector)
+	return plugin.Get(reflect.TypeOf((*Collector)(nil)).Elem(), config).(Collector)
 }
 
 func init() {
-	plugin.RegisterPluginCategory(CollectorCategory, nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Collector)(nil)).Elem()})
 }
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index 91dea45..658b8c8 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -29,7 +29,7 @@ import (
 type Fallbacker interface {
 	plugin.Plugin
 	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc, callback DisconnectionCallback) Fallbacker
+	FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc)
 }
 
 type DisconnectionCallback func()
@@ -41,5 +41,5 @@ func GetFallbacker(config plugin.DefaultConfig) Fallbacker {
 
 // init register the Fallbacker interface
 func init() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem(), nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Fallbacker)(nil)).Elem()})
 }
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index bc8e9bb..f755aa2 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -24,27 +24,19 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()        Initiating stage: Init plugin by config
-//    ||
-//    \/
-//   Process()     Running stage:    Process the input event to convert to new event. During the processing,
-//                                   the method should also tag event type to mark the event category.
-
 // Filter is a plugin interface, that defines new pipeline filters.
 type Filter interface {
 	plugin.Plugin
 
-	// Process would fetch the needed event
+	// Process would put the needed event to the OutputEventContext.
 	Process(context *event.OutputEventContext)
 }
 
-var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
-
 // Get filter plugin.
 func GetFilter(config plugin.DefaultConfig) Filter {
-	return plugin.Get(FilterCategory, config).(Filter)
+	return plugin.Get(reflect.TypeOf((*Filter)(nil)).Elem(), config).(Filter)
 }
 
 func init() {
-	plugin.RegisterPluginCategory(FilterCategory, nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Filter)(nil)).Elem()})
 }
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index b0509cd..b869159 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -27,7 +27,6 @@ import (
 // Forwarder is a plugin interface, that defines new forwarders.
 type Forwarder interface {
 	plugin.Plugin
-
 	// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
 	Forward(connection interface{}, batch event.BatchEvents) error
 	// ForwardType returns the supported event type.
@@ -44,5 +43,5 @@ func GetForwarder(config map[string]interface{}) Forwarder {
 
 // init register the Forwarder interface
 func init() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem(), nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Forwarder)(nil)).Elem()})
 }
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index cba3bca..5677216 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -24,9 +24,6 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//
-// Collector ==> RawData ==> Parser ==> SerializableEvent
-//
 // Parser is a plugin interface, that defines new Parsers for Collector plugin.
 type Parser interface {
 	plugin.Plugin
@@ -43,5 +40,5 @@ func GetParser(pluginName string, config plugin.DefaultConfig) Parser {
 }
 
 func init() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem(), nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Parser)(nil)).Elem()})
 }
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index f6d4d69..9966326 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -56,12 +56,10 @@ type QueueConsumer interface {
 	Dequeue() (event event.SerializableEvent, offset int64, err error)
 }
 
-var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
-
 func GetQueue(config plugin.DefaultConfig) Queue {
-	return plugin.Get(QueueCategory, config).(Queue)
+	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
 }
 
 func init() {
-	plugin.RegisterPluginCategory(QueueCategory, nil, nil, nil)
+	plugin.RegisterPluginCategory(&plugin.RegInfo{PluginType: reflect.TypeOf((*Queue)(nil)).Elem()})
 }