You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/03 00:49:04 UTC
[skywalking-satellite] branch main updated: Add API && Plugin
framework registry (#5)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 3bf78de Add API && Plugin framework registry (#5)
3bf78de is described below
commit 3bf78defe2b83dc229327d67cc5666ef5a59829b
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Thu Dec 3 08:48:56 2020 +0800
Add API && Plugin framework registry (#5)
---
README.md | 1 +
configs/config.yaml | 48 ++++++++++++
docs/project_structue.md | 63 +++++++++++++++
go.mod | 4 +-
go.sum | 5 +-
internal/pkg/event/event.go | 95 ++++++++++++++++++++++
internal/pkg/logger/log.go | 3 +
internal/pkg/logger/log_test.go | 4 +-
internal/pkg/plugin/define.go | 84 ++++++++++++++++++++
internal/pkg/plugin/registry.go | 79 +++++++++++++++++++
internal/satellite/event/event.go | 109 ++++++++++++++++++++++++++
plugins/client/api/client.go | 47 +++++++++++
plugins/client/example/client.go | 67 ++++++++++++++++
plugins/client/example/client_test.go | 71 +++++++++++++++++
plugins/client/grpc/README.md | 1 +
plugins/client/kakka/README.md | 1 +
plugins/collector/api/collector.go | 59 ++++++++++++++
plugins/collector/example/collector.go | 70 +++++++++++++++++
plugins/collector/example/collector_test.go | 71 +++++++++++++++++
plugins/collector/log-grpc/README.md | 1 +
plugins/fallbacker/api/fallbacker.go | 44 +++++++++++
plugins/fallbacker/example/fallbacker.go | 57 ++++++++++++++
plugins/fallbacker/example/fallbacker_test.go | 71 +++++++++++++++++
plugins/filter/api/filter.go | 50 ++++++++++++
plugins/filter/example/filter.go | 54 +++++++++++++
plugins/filter/example/filter_test.go | 71 +++++++++++++++++
plugins/filter/sampling/README.md | 1 +
plugins/forwarder/api/forwarder.go | 57 ++++++++++++++
plugins/forwarder/example/forwarder.go | 62 +++++++++++++++
plugins/forwarder/example/forwarder_test.go | 71 +++++++++++++++++
plugins/forwarder/segment/README.md | 1 +
plugins/parser/api/parser.go | 49 ++++++++++++
plugins/parser/example/parser.go | 62 +++++++++++++++
plugins/parser/example/parser_test.go | 71 +++++++++++++++++
plugins/parser/gork/README.md | 1 +
plugins/queue/api/queue.go | 61 ++++++++++++++
plugins/queue/example/queue.go | 72 +++++++++++++++++
plugins/queue/example/queue_test.go | 71 +++++++++++++++++
plugins/queue/mmap/README.md | 1 +
39 files changed, 1801 insertions(+), 9 deletions(-)
diff --git a/README.md b/README.md
index 7924ec4..8c4b88e 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,7 @@ Apache SkyWalking Satellite
# Documentation
- [The first design of Satellite 0.1.0](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/)
+- [The project structure](./docs/project_structue.md)
# Download
diff --git a/configs/config.yaml b/configs/config.yaml
new file mode 100644
index 0000000..232aecb
--- /dev/null
+++ b/configs/config.yaml
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Gatherer:
+ - name: segment-receiver
+ type: segment-receiver
+ config:
+ key: value
+ key2: value2
+ queue:
+ type: mmap-queue
+ config:
+ key: value
+ key2: value2
+ processor: processor1
+Processor:
+ - name: processor1
+ filters:
+ - filtername1
+ - filtername2
+ - filtername3
+Sender:
+ client:
+ name: gRPC-client
+ type: gRPC
+ config:
+ key1: value1
+ key2: value2
+ forwarders:
+ - type: segment-forwarder
+ eventType: segment
+ config:
+ key1: value1
+ key2: value2
\ No newline at end of file
diff --git a/docs/project_structue.md b/docs/project_structue.md
new file mode 100644
index 0000000..4a51ad8
--- /dev/null
+++ b/docs/project_structue.md
@@ -0,0 +1,63 @@
+# Project Structure
+- configs: Satellite configs.
+- internal: Core, Api, and common utils.
+- internal/pkg: Sharing with Core and Plugins, such as api and utils.
+- internal/satellite: The core of Satellite.
+- plugins: Contains all plugins.
+- plugins/{type}: Contains the plugins of this {type}. Satellite has 6 plugin types, which are collector, queue, parser, filter, client, and forward.
+- plugins/api: Contains the plugin definition.
+- plugins/{type}/{plugin-name}: Contains the specific plugin, and {plugin-name}-{type} would be registered as the plugin unique name in the registry.
+
+
+```
+.
+├── configs
+│ └── config.yaml
+├── internal
+│ ├── pkg
+│ │ ├── api
+│ │ │ ├── client.go
+│ │ │ ├── collector.go
+│ │ │ ├── event.go
+│ │ │ ├── filter.go
+│ │ │ ├── forwarder.go
+│ │ │ ├── parser.go
+│ │ │ ├── plugin.go
+│ │ │ └── queue.go
+│ │ └── ...
+│ └── satellite
+│ ├── registry
+│ │ └── registry.go
+│ └── ...
+├── plugins
+│ ├── client
+│ │ ├── api
+│ │ │ └── client.go
+│ │ ├── grpc
+│ │ └── kakka
+│ ├── collector
+│ │ ├── api
+│ │ │ └── collector.go
+│ │ ├── example
+│ │ └── log-grpc
+│ │ └── README.md
+│ ├── fallbacker
+│ │ ├── api
+│ │ │ └── fallbacker.go
+│ ├── filter
+│ │ ├── api
+│ │ │ └── filter.go
+│ ├── forwarder
+│ │ ├── api
+│ │ │ └── forwarder.go
+│ ├── parser
+│ │ ├── api
+│ │ │ └── parser.go
+│ │ └── gork
+│ │ └── README.md
+│ └── queue
+│ ├── api
+│ │ └── queue.go
+│ └── mmap
+│ └── README.md
+```
diff --git a/go.mod b/go.mod
index fa4fc28..71fd69d 100644
--- a/go.mod
+++ b/go.mod
@@ -2,6 +2,4 @@ module github.com/apache/skywalking-satellite
go 1.14
-require (
- github.com/sirupsen/logrus v1.7.0
-)
+require github.com/sirupsen/logrus v1.7.0
diff --git a/go.sum b/go.sum
index 4d54e45..4d74a1e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,13 +1,10 @@
-github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
new file mode 100644
index 0000000..c8ff0f2
--- /dev/null
+++ b/internal/pkg/event/event.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+ "fmt"
+ "time"
+)
+
+// The event type.
+const (
+ // Mapping to the type supported by SkyWalking OAP.
+ _ Type = iota
+ MetricsEvent
+ ProfilingEvent
+ SegmentEvent
+ ManagementEvent
+ MeterEvent
+ LogEvent
+)
+
+type Type int32
+
+// Event that implement this interface would be allowed to transmit in the Satellite.
+type Event interface {
+ // Name returns the event name.
+ Name() string
+
+ // Meta is a pair of key and value to record meta data, such as labels.
+ Meta() map[string]string
+
+ // Data returns the wrapped data.
+ Data() interface{}
+
+ // Time returns the event time.
+ Time() time.Time
+
+ // Type is to distinguish different events.
+ Type() Type
+
+ // IsRemote means is a output event when returns true.
+ IsRemote() bool
+}
+
+// SerializableEvent is used in Collector to bridge Queue.
+type SerializableEvent interface {
+ Event
+
+ // ToBytes serialize the event to a byte array.
+ ToBytes() []byte
+
+ // FromBytes deserialize the byte array to an event.
+ FromBytes(bytes []byte) SerializableEvent
+}
+
+// BatchEvents is used by Forwarder to forward.
+type BatchEvents []Event
+
+// OutputEventContext is a container to store the output context.
+type OutputEventContext struct {
+ context map[string]Event
+ Offset int64
+}
+
+// Put puts the incoming event into the context when the event is a remote event.
+func (c *OutputEventContext) Put(event Event) {
+ if event.IsRemote() {
+ c.context[event.Name()] = event
+ }
+}
+
+// Get returns a event in the context. When the eventName does not exist, a error would be returned.
+func (c *OutputEventContext) Get(eventName string) (Event, error) {
+ e, ok := c.context[eventName]
+ if !ok {
+ err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
+ return nil, err
+ }
+ return e, nil
+}
diff --git a/internal/pkg/logger/log.go b/internal/pkg/logger/log.go
index 6021bf6..a62128d 100644
--- a/internal/pkg/logger/log.go
+++ b/internal/pkg/logger/log.go
@@ -62,6 +62,9 @@ func Init(opts ...Option) {
if f.timePattern == "" {
f.timePattern = defaultTimePattern
}
+ if !strings.Contains(f.logPattern, "\n") {
+ f.logPattern += "\n"
+ }
Log.SetFormatter(f)
})
}
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/logger/log_test.go
index 649eea6..d8ad895 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/logger/log_test.go
@@ -38,7 +38,7 @@ func TestFormatter_Format(t *testing.T) {
}{
{
name: "logWithEmptyFields",
- want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1"),
+ want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
args: args{
entry: func() *logrus.Entry {
entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
@@ -50,7 +50,7 @@ func TestFormatter_Format(t *testing.T) {
},
{
name: "logWithFields",
- want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2"),
+ want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
args: args{
entry: func() *logrus.Entry {
entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
diff --git a/internal/pkg/plugin/define.go b/internal/pkg/plugin/define.go
new file mode 100644
index 0000000..ad95698
--- /dev/null
+++ b/internal/pkg/plugin/define.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+// The following graph illustrates the relationship between different plugin interface in api package.
+//
+//
+// Gatherer Processor
+// ------------------------------- -------------------------------------------
+// | ----------- --------- | | ----------- ----------- |
+// | | Collector | ==> | Queue | |==>| | Filter | ==> ... ==> | Filter | |
+// | | (Parser) | | Mem/File | | | ----------- ----------- |
+// | ----------- ---------- | | || || |
+// -------------------------------- | \/ \/ |
+// | --------------------------------------- |
+// | | OutputEventContext | |
+// | --------------------------------------- |
+// ------------------------------------------- -----------------
+// || ----->| Sharing Client |
+// \/ Sender | -----------------
+// ----------------------------------------|-
+// | --- --- | |
+// | | B | | D | ----------------- | |
+// | | A | | I | |Segment Forwarder|-| |
+// | | T | | S | | (Fallbacker) | | |
+// | | C | | P | ----------------- | |
+// | | H | => | A | | | ===> Kakfa/OAP
+// | | B | | T | => ...... | |
+// | | U | | C | | |
+// | | F | | H | ----------------- | |
+// | | F | | E | | Meter Forwarder|-| |
+// | | E | | R | | (Fallbacker | | |
+// | | R | | | ----------------- | |
+// | --- --- | |
+// ----------------------------------------
+//
+//
+// 1. The Collector plugin would fetch or receive the input data.
+// 2. The Parser plugin would parse the input data to SerializableEvent that is supported
+// to be stored in Queue.
+// 3. The Queue plugin stores the SerializableEvent. However, whether serializing depends on
+// the Queue implements. For example, the serialization is unnecessary when using a Memory
+// Queue. Once an event is pulled by the consumer of Queue, the event will be processed by
+// the filters in Processor.
+// 4. The Filter plugin would process the event to create a new event. Next, the event is passed
+// to the next filter to do the same things until the whole filters are performed. All created
+// events would be stored in the OutputEventContext. However, only the events labeled with
+// RemoteEvent type would be forwarded by Forwarder.
+// 5. After processing, the events in OutputEventContext would be stored in the BatchBuffer. When
+// the timer is triggered or the capacity limit is reached, the events in BatchBuffer would be
+// partitioned by EventType and sent to the different Forwarders, such as Segment Forwarder and
+// Meter Forwarder.
+// 6. The Follower in different Senders would share with the remote client to avoid make duplicate
+// connections and have the same Fallbacker(FallBack strategy) to process data. When all
+// forwarders send success or process success in Fallbacker, the dispatcher would also ack the
+// batch is a success.
+
+// ============================================================================================
+//
+// There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
+// preparing phase, running phase, and closing phase. In the running phase, each plugin has
+// its own interface definition. However, the other three phases have to be defined uniformly.
+
+type Plugin interface {
+ // Description returns the description of the specific plugin.
+ Description() string
+ // Init initialize the specific plugin.
+ InitPlugin(config map[string]interface{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
new file mode 100644
index 0000000..4731d36
--- /dev/null
+++ b/internal/pkg/plugin/registry.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+)
+
+// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
+// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
+// used as key in pluginRegistry.
+
+// reg is the global plugin registry
+var (
+ reg map[reflect.Type]map[string]reflect.Value
+ lock sync.Mutex
+)
+
+func init() {
+ reg = make(map[reflect.Type]map[string]reflect.Value)
+}
+
+// Add new plugin category. The different plugin category could have same plugin names.
+func AddPluginCategory(pluginCategory reflect.Type) {
+ lock.Lock()
+ defer lock.Unlock()
+ reg[pluginCategory] = map[string]reflect.Value{}
+}
+
+// RegisterPlugin registers the pluginType as plugin.
+// If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
+func RegisterPlugin(pluginName string, plugin interface{}) {
+ lock.Lock()
+ defer lock.Unlock()
+ v := reflect.ValueOf(plugin)
+ success := false
+ for pCategory, pReg := range reg {
+ if v.Type().Implements(pCategory) {
+ pReg[pluginName] = v
+ fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+ success = true
+ }
+ }
+ if !success {
+ fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+ }
+}
+
+// Get the specific plugin according to the pluginCategory and pluginName.
+func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
+ value, ok := reg[pluginCategory][pluginName]
+ if !ok {
+ panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+ }
+ t := value.Type()
+ if t.Kind() == reflect.Ptr {
+ t = t.Elem()
+ }
+ plugin := reflect.New(t).Interface().(Plugin)
+ plugin.InitPlugin(config)
+ return plugin
+}
diff --git a/internal/satellite/event/event.go b/internal/satellite/event/event.go
new file mode 100644
index 0000000..a53720b
--- /dev/null
+++ b/internal/satellite/event/event.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Event defines the common fields.
+type Event struct {
+ name string
+ timestamp time.Time
+ meta map[string]string
+ eventType event.Type
+ remote bool
+}
+
+// ETBFunc serialize event to bytes.
+type ETBFunc func(event event.SerializableEvent) []byte
+
+// BToFunc deserialize bytes to event.
+type BToFunc func(bytes []byte) event.SerializableEvent
+
+// StructuredEvent works when the data is a struct type.
+type StructuredEvent struct {
+ Event
+ data interface{}
+}
+
+// StructuredEvent works when the data is not a struct type.
+type UnstructuredEvent struct {
+ Event
+ data map[string]interface{}
+}
+
+// StructuredEvent works when the data is a struct type in the collector.
+type StructuredSerializableEvent struct {
+ StructuredEvent
+ etb ETBFunc
+ bte BToFunc
+}
+
+// StructuredEvent works when the data is not a struct type in the collector.
+type UnstructuredSerializableEvent struct {
+ UnstructuredEvent
+ etb ETBFunc
+ bte BToFunc
+}
+
+func (s *Event) Name() string {
+ return s.name
+}
+
+func (s *Event) Meta() map[string]string {
+ return s.meta
+}
+
+func (s *Event) Time() time.Time {
+ return s.timestamp
+}
+
+func (s *Event) Type() event.Type {
+ return s.eventType
+}
+
+func (s *Event) IsRemote() bool {
+ return s.remote
+}
+
+func (u *StructuredEvent) Data() interface{} {
+ return u.data
+}
+
+func (u *UnstructuredEvent) Data() interface{} {
+ return u.data
+}
+
+func (s *StructuredSerializableEvent) ToBytes() []byte {
+ return s.etb(s)
+}
+
+func (s *StructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
+ return s.bte(bytes)
+}
+
+func (u *UnstructuredSerializableEvent) ToBytes() []byte {
+ return u.etb(u)
+}
+
+func (u *UnstructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
+ return u.bte(bytes)
+}
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
new file mode 100644
index 0000000..51d2b65
--- /dev/null
+++ b/plugins/client/api/client.go
@@ -0,0 +1,47 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
+type Client interface {
+ plugin.Plugin
+
+ // Prepare would make connection with outer service.
+ Prepare()
+ // GetConnection returns the connected client to publish events.
+ GetConnectedClient() interface{}
+ // Close the connection with outer service.
+ Close()
+}
+
+var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+
+// Get client plugin.
+func GetClient(pluginName string, config map[string]interface{}) Client {
+ return plugin.Get(ClientCategory, pluginName, config).(Client)
+}
+
+func init() {
+ plugin.AddPluginCategory(ClientCategory)
+}
diff --git a/plugins/client/example/client.go b/plugins/client/example/client.go
new file mode 100644
index 0000000..77b29c7
--- /dev/null
+++ b/plugins/client/example/client.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+type demoClient struct {
+ a string
+}
+type demoClient2 struct {
+ a string
+}
+type demoClient3 struct {
+ a string
+}
+
+func (d demoClient) Description() string {
+ panic("implement me")
+}
+
+func (d demoClient) InitPlugin(config map[string]interface{}) {
+
+}
+
+func (d demoClient) Prepare() {
+ panic("implement me")
+}
+
+func (d demoClient) GetConnectedClient() interface{} {
+ panic("implement me")
+}
+
+func (d demoClient) Close() {
+ panic("implement me")
+}
+
+func (d *demoClient2) Description() string {
+ panic("implement me")
+}
+
+func (d *demoClient2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoClient2) Prepare() {
+ panic("implement me")
+}
+
+func (d *demoClient2) GetConnectedClient() interface{} {
+ panic("implement me")
+}
+
+func (d *demoClient2) Close() {
+ panic("implement me")
+}
diff --git a/plugins/client/example/client_test.go b/plugins/client/example/client_test.go
new file mode 100644
index 0000000..a9e355e
--- /dev/null
+++ b/plugins/client/example/client_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoClient",
+ args: demoClient{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoClient2",
+ args: &demoClient2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoClient3",
+ args: demoClient3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetClient(name, config)
+}
diff --git a/plugins/client/grpc/README.md b/plugins/client/grpc/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/client/grpc/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/client/kakka/README.md b/plugins/client/kakka/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/client/kakka/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
new file mode 100644
index 0000000..374d63a
--- /dev/null
+++ b/plugins/collector/api/collector.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Init() Initial stage: Init plugin by config
+// ||
+// \/
+// Prepare() Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
+// ||
+// \/
+// Next() Running stage: When Collector collect a data, the data would be fetched by the upstream
+// || component through this method.
+// \/
+// Close() Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
+
+// Collector is a plugin interface, that defines new collectors.
+type Collector interface {
+ plugin.Plugin
+
+ // Prepare creates a listen or reader to gather data.
+ Prepare()
+ // Next return the data from the input.
+ Next() (event.SerializableEvent, error)
+ // Close would close collector.
+ Close()
+}
+
+var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
+
+// Get collector plugin.
+func GetCollector(pluginName string, config map[string]interface{}) Collector {
+ return plugin.Get(CollectorCategory, pluginName, config).(Collector)
+}
+
+func init() {
+ plugin.AddPluginCategory(CollectorCategory)
+}
diff --git a/plugins/collector/example/collector.go b/plugins/collector/example/collector.go
new file mode 100644
index 0000000..40b9b8b
--- /dev/null
+++ b/plugins/collector/example/collector.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoCollector struct {
+ a string
+}
+
+type demoCollector2 struct {
+ a string
+}
+
+type demoCollector3 struct {
+ a string
+}
+
+func (d *demoCollector) Description() string {
+ panic("implement me")
+}
+
+func (d *demoCollector) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoCollector) Prepare() {
+ panic("implement me")
+}
+
+func (d *demoCollector) Next() (event.SerializableEvent, error) {
+ panic("implement me")
+}
+
+func (d *demoCollector) Close() {
+ panic("implement me")
+}
+
+func (d demoCollector2) Description() string {
+ panic("implement me")
+}
+
+func (d demoCollector2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoCollector2) Prepare() {
+ panic("implement me")
+}
+
+func (d demoCollector2) Next() (event.SerializableEvent, error) {
+ panic("implement me")
+}
+
+func (d demoCollector2) Close() {
+ panic("implement me")
+}
diff --git a/plugins/collector/example/collector_test.go b/plugins/collector/example/collector_test.go
new file mode 100644
index 0000000..e73f3bd
--- /dev/null
+++ b/plugins/collector/example/collector_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/collector/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoCollector",
+ args: &demoCollector{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoCollector2",
+ args: demoCollector2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoCollector3",
+ args: demoCollector3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetCollector(name, config)
+}
diff --git a/plugins/collector/log-grpc/README.md b/plugins/collector/log-grpc/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/collector/log-grpc/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
new file mode 100644
index 0000000..bde56ef
--- /dev/null
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -0,0 +1,44 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Fallbacker is a plugin interface, that defines some fallback strategies.
+type Fallbacker interface {
+ plugin.Plugin
+
+ // FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
+ FallBack(batch event.BatchEvents) Fallbacker
+}
+
+var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
+
+// Get Fallbacker plugin.
+func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
+ return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+}
+
+func init() {
+ plugin.AddPluginCategory(FallbackerCategory)
+}
diff --git a/plugins/fallbacker/example/fallbacker.go b/plugins/fallbacker/example/fallbacker.go
new file mode 100644
index 0000000..b933e96
--- /dev/null
+++ b/plugins/fallbacker/example/fallbacker.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+)
+
+type demoFallbacker struct {
+ a string
+}
+
+type demoFallbacker2 struct {
+ a string
+}
+
+type demoFallbacker3 struct {
+ a string
+}
+
+func (d *demoFallbacker) Description() string {
+ panic("implement me")
+}
+
+func (d *demoFallbacker) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoFallbacker) FallBack(batch event.BatchEvents) api.Fallbacker {
+ panic("implement me")
+}
+
+func (d demoFallbacker2) Description() string {
+ panic("implement me")
+}
+
+func (d demoFallbacker2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoFallbacker2) FallBack(batch event.BatchEvents) api.Fallbacker {
+ panic("implement me")
+}
diff --git a/plugins/fallbacker/example/fallbacker_test.go b/plugins/fallbacker/example/fallbacker_test.go
new file mode 100644
index 0000000..f5f445f
--- /dev/null
+++ b/plugins/fallbacker/example/fallbacker_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoFallbacker",
+ args: &demoFallbacker{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoFallbacker2",
+ args: demoFallbacker2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoFallbacker3",
+ args: demoFallbacker3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetFallbacker(name, config)
+}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
new file mode 100644
index 0000000..0ed43db
--- /dev/null
+++ b/plugins/filter/api/filter.go
@@ -0,0 +1,50 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Init() Initiating stage: Init plugin by config
+// ||
+// \/
+// Process() Running stage: Process the input event to convert to new event. During the processing,
+// the method should also tag event type to mark the event category.
+
+// Filter is a plugin interface, that defines new pipeline filters.
+type Filter interface {
+ plugin.Plugin
+
+ // Process produces a new event by processing incoming event.
+ Process(in event.Event) event.Event
+}
+
+var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
+
+// Get filter plugin.
+func GetFilter(pluginName string, config map[string]interface{}) Filter {
+ return plugin.Get(FilterCategory, pluginName, config).(Filter)
+}
+
+func init() {
+ plugin.AddPluginCategory(FilterCategory)
+}
diff --git a/plugins/filter/example/filter.go b/plugins/filter/example/filter.go
new file mode 100644
index 0000000..14abb69
--- /dev/null
+++ b/plugins/filter/example/filter.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoFilter struct {
+ a string
+}
+
+type demoFilter2 struct {
+ a string
+}
+
+type demoFilter3 struct {
+ a string
+}
+
+func (d *demoFilter) Description() string {
+ panic("implement me")
+}
+
+func (d *demoFilter) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoFilter) Process(in event.Event) event.Event {
+ panic("implement me")
+}
+
+func (d demoFilter2) Description() string {
+ panic("implement me")
+}
+
+func (d demoFilter2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoFilter2) Process(in event.Event) event.Event {
+ panic("implement me")
+}
diff --git a/plugins/filter/example/filter_test.go b/plugins/filter/example/filter_test.go
new file mode 100644
index 0000000..aae4371
--- /dev/null
+++ b/plugins/filter/example/filter_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoFilter",
+ args: &demoFilter{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoFilter2",
+ args: demoFilter2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoFilter3",
+ args: demoFilter3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetFilter(name, config)
+}
diff --git a/plugins/filter/sampling/README.md b/plugins/filter/sampling/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/filter/sampling/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
new file mode 100644
index 0000000..9e99f14
--- /dev/null
+++ b/plugins/forwarder/api/forwarder.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Init() Initiating stage: Init plugin by config
+// ||
+// \/
+// Prepare() Preparing stage: Prepare the Forwarder, such as get remote client.
+// ||
+// \/
+// Forward() Running stage: Forward the batch events
+// ||
+// \/
+// Close() Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
+
+// Forwarder is a plugin interface, that defines new forwarders.
+type Forwarder interface {
+ plugin.Plugin
+
+ // Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
+ Forward(batch event.BatchEvents)
+
+ // ForwardType returns the supporting event type that could be forwarded.
+ ForwardType() event.Type
+}
+
+var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
+
+func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
+ return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
+}
+
+func init() {
+ plugin.AddPluginCategory(ForwarderCategory)
+}
diff --git a/plugins/forwarder/example/forwarder.go b/plugins/forwarder/example/forwarder.go
new file mode 100644
index 0000000..637f9e1
--- /dev/null
+++ b/plugins/forwarder/example/forwarder.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoForwarder struct {
+ a string
+}
+
+type demoForwarder2 struct {
+ a string
+}
+
+type demoForwarder3 struct {
+ a string
+}
+
+func (d *demoForwarder) Description() string {
+ panic("implement me")
+}
+
+func (d *demoForwarder) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoForwarder) Forward(batch event.BatchEvents) {
+ panic("implement me")
+}
+
+func (d demoForwarder2) Description() string {
+ panic("implement me")
+}
+
+func (d demoForwarder2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoForwarder2) Forward(batch event.BatchEvents) {
+ panic("implement me")
+}
+
+func (d demoForwarder2) ForwardType() event.Type {
+ panic("implement me")
+}
+
+func (d *demoForwarder) ForwardType() event.Type {
+ panic("implement me")
+}
diff --git a/plugins/forwarder/example/forwarder_test.go b/plugins/forwarder/example/forwarder_test.go
new file mode 100644
index 0000000..28de35d
--- /dev/null
+++ b/plugins/forwarder/example/forwarder_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoForwarder",
+ args: &demoForwarder{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoForwarder2",
+ args: demoForwarder2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoForwarder3",
+ args: demoForwarder3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetForwarder(name, config)
+}
diff --git a/plugins/forwarder/segment/README.md b/plugins/forwarder/segment/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/forwarder/segment/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
new file mode 100644
index 0000000..3ca5ebe
--- /dev/null
+++ b/plugins/parser/api/parser.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+//
+// Collector ==> RawData ==> Parser ==> SerializableEvent
+//
+// Parser is a plugin interface, that defines new Parsers for Collector plugin.
+type Parser interface {
+ plugin.Plugin
+
+ // ParseBytes parse the byte buffer into events.
+ ParseBytes(bytes []byte) ([]event.SerializableEvent, error)
+
+ // ParseStr parse the string into events.
+ ParseStr(str string) ([]event.SerializableEvent, error)
+}
+
+var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
+
+func GetParser(pluginName string, config map[string]interface{}) Parser {
+ return plugin.Get(ParserCategory, pluginName, config).(Parser)
+}
+
+func init() {
+ plugin.AddPluginCategory(ParserCategory)
+}
diff --git a/plugins/parser/example/parser.go b/plugins/parser/example/parser.go
new file mode 100644
index 0000000..839956f
--- /dev/null
+++ b/plugins/parser/example/parser.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoParser struct {
+ a string
+}
+
+type demoParser2 struct {
+ a string
+}
+
+type demoParser3 struct {
+ a string
+}
+
+func (d *demoParser) Description() string {
+ panic("implement me")
+}
+
+func (d *demoParser) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoParser) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
+ panic("implement me")
+}
+
+func (d *demoParser) ParseStr(str string) ([]event.SerializableEvent, error) {
+ panic("implement me")
+}
+
+func (d demoParser2) Description() string {
+ panic("implement me")
+}
+
+func (d demoParser2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoParser2) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
+ panic("implement me")
+}
+
+func (d demoParser2) ParseStr(str string) ([]event.SerializableEvent, error) {
+ panic("implement me")
+}
diff --git a/plugins/parser/example/parser_test.go b/plugins/parser/example/parser_test.go
new file mode 100644
index 0000000..b577f7d
--- /dev/null
+++ b/plugins/parser/example/parser_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/parser/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoParser",
+ args: &demoParser{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoParser2",
+ args: demoParser2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoParser3",
+ args: demoParser3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetParser(name, config)
+}
diff --git a/plugins/parser/gork/README.md b/plugins/parser/gork/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/parser/gork/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
new file mode 100644
index 0000000..9b95156
--- /dev/null
+++ b/plugins/queue/api/queue.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Queue is a plugin interface, that defines new queues.
+type Queue interface {
+ plugin.Plugin
+
+ // Publisher get the only publisher for the current queue.
+ Publisher() QueuePublisher
+
+ // Consumer get the only consumer for the current queue.
+ Consumer() QueueConsumer
+
+ // Close would close the queue.
+ Close()
+}
+
+// QueuePublisher is a plugin interface, that defines new queue publishers.
+type QueuePublisher interface {
+ // Enqueue push a inputEvent into the queue.
+ Enqueue(event *event.SerializableEvent) error
+}
+
+// QueueConsumer is a plugin interface, that defines new queue consumers.
+type QueueConsumer interface {
+ // Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
+ Dequeue() (event *event.SerializableEvent, offset int64, err error)
+}
+
+var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
+
+func GetQueue(pluginName string, config map[string]interface{}) Queue {
+ return plugin.Get(QueueCategory, pluginName, config).(Queue)
+}
+
+func init() {
+ plugin.AddPluginCategory(QueueCategory)
+}
diff --git a/plugins/queue/example/queue.go b/plugins/queue/example/queue.go
new file mode 100644
index 0000000..5a3055a
--- /dev/null
+++ b/plugins/queue/example/queue.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type demoQueue struct {
+ a string
+}
+
+type demoQueue2 struct {
+ a string
+}
+
+type demoQueue3 struct {
+ a string
+}
+
+func (d *demoQueue) Description() string {
+ panic("implement me")
+}
+
+func (d *demoQueue) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoQueue) Publisher() api.QueuePublisher {
+ panic("implement me")
+}
+
+func (d *demoQueue) Consumer() api.QueueConsumer {
+ panic("implement me")
+}
+
+func (d *demoQueue) Close() {
+ panic("implement me")
+}
+
+func (d demoQueue2) Description() string {
+ panic("implement me")
+}
+
+func (d demoQueue2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoQueue2) Publisher() api.QueuePublisher {
+ panic("implement me")
+}
+
+func (d demoQueue2) Consumer() api.QueueConsumer {
+ panic("implement me")
+}
+
+func (d demoQueue2) Close() {
+ panic("implement me")
+}
diff --git a/plugins/queue/example/queue_test.go b/plugins/queue/example/queue_test.go
new file mode 100644
index 0000000..0afd20d
--- /dev/null
+++ b/plugins/queue/example/queue_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+func Test_Register(t *testing.T) {
+ tests := []struct {
+ name string
+ args interface{}
+ panic bool
+ }{
+ {
+ name: "demoQueue",
+ args: &demoQueue{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoQueue2",
+ args: demoQueue2{
+ a: "s",
+ },
+ panic: false,
+ },
+ {
+ name: "demoQueue3",
+ args: demoQueue3{
+ a: "s",
+ },
+ panic: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ plugin.RegisterPlugin(tt.name, tt.args)
+ assertPanic(t, tt.name, nil, tt.panic)
+ })
+ }
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+ defer func() {
+ if r := recover(); r != nil && !existPanic {
+ t.Errorf("the plugin %s is not pass", name)
+ }
+ }()
+ api.GetQueue(name, config)
+}
diff --git a/plugins/queue/mmap/README.md b/plugins/queue/mmap/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/queue/mmap/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file