You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/03 00:49:04 UTC

[skywalking-satellite] branch main updated: Add API && Plugin framework registry (#5)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bf78de  Add API && Plugin framework registry (#5)
3bf78de is described below

commit 3bf78defe2b83dc229327d67cc5666ef5a59829b
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Thu Dec 3 08:48:56 2020 +0800

    Add API && Plugin framework registry (#5)
---
 README.md                                     |   1 +
 configs/config.yaml                           |  48 ++++++++++++
 docs/project_structue.md                      |  63 +++++++++++++++
 go.mod                                        |   4 +-
 go.sum                                        |   5 +-
 internal/pkg/event/event.go                   |  95 ++++++++++++++++++++++
 internal/pkg/logger/log.go                    |   3 +
 internal/pkg/logger/log_test.go               |   4 +-
 internal/pkg/plugin/define.go                 |  84 ++++++++++++++++++++
 internal/pkg/plugin/registry.go               |  79 +++++++++++++++++++
 internal/satellite/event/event.go             | 109 ++++++++++++++++++++++++++
 plugins/client/api/client.go                  |  47 +++++++++++
 plugins/client/example/client.go              |  67 ++++++++++++++++
 plugins/client/example/client_test.go         |  71 +++++++++++++++++
 plugins/client/grpc/README.md                 |   1 +
 plugins/client/kakka/README.md                |   1 +
 plugins/collector/api/collector.go            |  59 ++++++++++++++
 plugins/collector/example/collector.go        |  70 +++++++++++++++++
 plugins/collector/example/collector_test.go   |  71 +++++++++++++++++
 plugins/collector/log-grpc/README.md          |   1 +
 plugins/fallbacker/api/fallbacker.go          |  44 +++++++++++
 plugins/fallbacker/example/fallbacker.go      |  57 ++++++++++++++
 plugins/fallbacker/example/fallbacker_test.go |  71 +++++++++++++++++
 plugins/filter/api/filter.go                  |  50 ++++++++++++
 plugins/filter/example/filter.go              |  54 +++++++++++++
 plugins/filter/example/filter_test.go         |  71 +++++++++++++++++
 plugins/filter/sampling/README.md             |   1 +
 plugins/forwarder/api/forwarder.go            |  57 ++++++++++++++
 plugins/forwarder/example/forwarder.go        |  62 +++++++++++++++
 plugins/forwarder/example/forwarder_test.go   |  71 +++++++++++++++++
 plugins/forwarder/segment/README.md           |   1 +
 plugins/parser/api/parser.go                  |  49 ++++++++++++
 plugins/parser/example/parser.go              |  62 +++++++++++++++
 plugins/parser/example/parser_test.go         |  71 +++++++++++++++++
 plugins/parser/gork/README.md                 |   1 +
 plugins/queue/api/queue.go                    |  61 ++++++++++++++
 plugins/queue/example/queue.go                |  72 +++++++++++++++++
 plugins/queue/example/queue_test.go           |  71 +++++++++++++++++
 plugins/queue/mmap/README.md                  |   1 +
 39 files changed, 1801 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index 7924ec4..8c4b88e 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,7 @@ Apache SkyWalking Satellite
 
 # Documentation
 - [The first design of Satellite 0.1.0](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/)
+- [The project structure](./docs/project_structue.md)
 
 # Download
 
diff --git a/configs/config.yaml b/configs/config.yaml
new file mode 100644
index 0000000..232aecb
--- /dev/null
+++ b/configs/config.yaml
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Gatherer:
+  - name: segment-receiver
+    type: segment-receiver
+    config:
+      key: value
+      key2: value2
+    queue:
+      type: mmap-queue
+      config:
+        key: value
+        key2: value2
+    processor: processor1
+Processor:
+  - name: processor1
+    filters:
+      - filtername1
+      - filtername2
+      - filtername3
+Sender:
+  client:
+    name: gRPC-client
+    type: gRPC
+    config:
+      key1: value1
+      key2: value2
+  forwarders:
+    - type: segment-forwarder
+      eventType: segment
+      config:
+        key1: value1
+        key2: value2
\ No newline at end of file
diff --git a/docs/project_structue.md b/docs/project_structue.md
new file mode 100644
index 0000000..4a51ad8
--- /dev/null
+++ b/docs/project_structue.md
@@ -0,0 +1,63 @@
+# Project Structure
+- configs: Satellite configs.
+- internal: Core, Api, and common utils.
+- internal/pkg: Sharing with Core and Plugins, such as api and utils.
+- internal/satellite: The core of Satellite.
+- plugins: Contains all plugins.
+- plugins/{type}: Contains the plugins of this {type}. Satellite has 6 plugin types, which are collector, queue, parser, filter, client, and forward.
+- plugins/api: Contains the plugin definition.
+- plugins/{type}/{plugin-name}: Contains the specific plugin, and {plugin-name}-{type} would be registered as the plugin unique name in the registry. 
+
+
+```
+.
+├── configs
+│   └── config.yaml
+├── internal
+│   ├── pkg
+│   │   ├── api
+│   │   │   ├── client.go
+│   │   │   ├── collector.go
+│   │   │   ├── event.go
+│   │   │   ├── filter.go
+│   │   │   ├── forwarder.go
+│   │   │   ├── parser.go
+│   │   │   ├── plugin.go
+│   │   │   └── queue.go
+│   │   └── ...
+│   └── satellite
+│       ├── registry
+│       │   └── registry.go
+│       └── ...
+├── plugins
+│   ├── client
+│   │   ├── api
+│   │   │   └── client.go
+│   │   ├── grpc
+│   │   └── kakka
+│   ├── collector
+│   │   ├── api
+│   │   │   └── collector.go
+│   │   ├── example
+│   │   └── log-grpc
+│   │       └── README.md
+│   ├── fallbacker
+│   │   ├── api
+│   │   │   └── fallbacker.go
+│   ├── filter
+│   │   ├── api
+│   │   │   └── filter.go
+│   ├── forwarder
+│   │   ├── api
+│   │   │   └── forwarder.go
+│   ├── parser
+│   │   ├── api
+│   │   │   └── parser.go
+│   │   └── gork
+│   │       └── README.md
+│   └── queue
+│       ├── api
+│       │   └── queue.go
+│       └── mmap
+│           └── README.md
+```
diff --git a/go.mod b/go.mod
index fa4fc28..71fd69d 100644
--- a/go.mod
+++ b/go.mod
@@ -2,6 +2,4 @@ module github.com/apache/skywalking-satellite
 
 go 1.14
 
-require (
-	github.com/sirupsen/logrus v1.7.0
-)
+require github.com/sirupsen/logrus v1.7.0
diff --git a/go.sum b/go.sum
index 4d54e45..4d74a1e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,13 +1,10 @@
-github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
new file mode 100644
index 0000000..c8ff0f2
--- /dev/null
+++ b/internal/pkg/event/event.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+	"fmt"
+	"time"
+)
+
+// The event type.
+const (
+	// Mapping to the type supported by SkyWalking OAP.
+	_ Type = iota
+	MetricsEvent
+	ProfilingEvent
+	SegmentEvent
+	ManagementEvent
+	MeterEvent
+	LogEvent
+)
+
+type Type int32
+
+// Event that implement this interface would be allowed to transmit in the Satellite.
+type Event interface {
+	// Name returns the event name.
+	Name() string
+
+	// Meta is a pair of key and value to record meta data, such as labels.
+	Meta() map[string]string
+
+	// Data returns the wrapped data.
+	Data() interface{}
+
+	// Time returns the event time.
+	Time() time.Time
+
+	// Type is to distinguish different events.
+	Type() Type
+
+	// IsRemote means is a output event when returns true.
+	IsRemote() bool
+}
+
+// SerializableEvent is used in Collector to bridge Queue.
+type SerializableEvent interface {
+	Event
+
+	// ToBytes serialize the event to a byte array.
+	ToBytes() []byte
+
+	// FromBytes deserialize the byte array to an event.
+	FromBytes(bytes []byte) SerializableEvent
+}
+
+// BatchEvents is used by Forwarder to forward.
+type BatchEvents []Event
+
+// OutputEventContext is a container to store the output context.
+type OutputEventContext struct {
+	context map[string]Event
+	Offset  int64
+}
+
+// Put puts the incoming event into the context when the event is a remote event.
+func (c *OutputEventContext) Put(event Event) {
+	if event.IsRemote() {
+		c.context[event.Name()] = event
+	}
+}
+
+// Get returns a event in the context. When the eventName does not exist, a error would be returned.
+func (c *OutputEventContext) Get(eventName string) (Event, error) {
+	e, ok := c.context[eventName]
+	if !ok {
+		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
+		return nil, err
+	}
+	return e, nil
+}
diff --git a/internal/pkg/logger/log.go b/internal/pkg/logger/log.go
index 6021bf6..a62128d 100644
--- a/internal/pkg/logger/log.go
+++ b/internal/pkg/logger/log.go
@@ -62,6 +62,9 @@ func Init(opts ...Option) {
 		if f.timePattern == "" {
 			f.timePattern = defaultTimePattern
 		}
+		if !strings.Contains(f.logPattern, "\n") {
+			f.logPattern += "\n"
+		}
 		Log.SetFormatter(f)
 	})
 }
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/logger/log_test.go
index 649eea6..d8ad895 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/logger/log_test.go
@@ -38,7 +38,7 @@ func TestFormatter_Format(t *testing.T) {
 	}{
 		{
 			name: "logWithEmptyFields",
-			want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1"),
+			want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
 			args: args{
 				entry: func() *logrus.Entry {
 					entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
@@ -50,7 +50,7 @@ func TestFormatter_Format(t *testing.T) {
 		},
 		{
 			name: "logWithFields",
-			want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2"),
+			want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
 			args: args{
 				entry: func() *logrus.Entry {
 					entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
diff --git a/internal/pkg/plugin/define.go b/internal/pkg/plugin/define.go
new file mode 100644
index 0000000..ad95698
--- /dev/null
+++ b/internal/pkg/plugin/define.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+// The following graph illustrates the relationship between different plugin interface in api package.
+//
+//
+//                   Gatherer                                Processor
+//       -------------------------------      -------------------------------------------
+//      |  -----------       ---------   |   |  -----------                 -----------  |
+//      | | Collector | ==> |  Queue   | |==>| |  Filter   | ==>  ...  ==> |  Filter   | |
+//      | | (Parser)  |     | Mem/File | |   |  -----------                 -----------  |
+//      |  -----------       ----------  |   |      ||                          ||       |
+//       --------------------------------    |      \/	                        \/       |
+//                                           |  ---------------------------------------  |
+//                                           | |             OutputEventContext        | |
+//                                           |  ---------------------------------------  |
+//                                            -------------------------------------------      -----------------
+//                                             ||                                       ----->| Sharing Client  |
+//                                             \/              Sender                  |       -----------------
+//                                             ----------------------------------------|-
+//                                            |  ---       ---                         | |
+//                                            | | B |     | D |     -----------------  | |
+//                                            | | A |     | I |    |Segment Forwarder|-| |
+//                                            | | T |     | S |    |    (Fallbacker) | | |
+//                                            | | C |     | P |     -----------------  | |
+//                                            | | H |  => | A |                        | | ===> Kakfa/OAP
+//                                            | | B |     | T | =>        ......       | |
+//                                            | | U |     | C |                        | |
+//                                            | | F |     | H |     -----------------  | |
+//                                            | | F |     | E |    | Meter  Forwarder|-| |
+//                                            | | E |     | R |    |     (Fallbacker | | |
+//                                            | | R |     |   |     -----------------  | |
+//                                            |  ---       ---                         | |
+//                                             ----------------------------------------
+//
+//
+// 1. The Collector plugin would fetch or receive the input data.
+// 2. The Parser plugin would parse the input data to SerializableEvent that is supported
+//    to be stored in Queue.
+// 3. The Queue plugin stores the SerializableEvent. However, whether serializing depends on
+//    the Queue implements. For example, the serialization is unnecessary when using a Memory
+//    Queue. Once an event is pulled by the consumer of Queue, the event will be processed by
+//    the filters in Processor.
+// 4. The Filter plugin would process the event to create a new event. Next, the event is passed
+//    to the next filter to do the same things until the whole filters are performed. All created
+//    events would be stored in the OutputEventContext. However, only the events labeled with
+//    RemoteEvent type would be forwarded by Forwarder.
+// 5. After processing, the events in OutputEventContext would be stored in the BatchBuffer. When
+//    the timer is triggered or the capacity limit is reached, the events in BatchBuffer would be
+//    partitioned by EventType and sent to the different Forwarders, such as Segment Forwarder and
+//    Meter Forwarder.
+// 6. The Follower in different Senders would share with the remote client to avoid make duplicate
+//    connections and have the same Fallbacker(FallBack strategy) to process data. When all
+//    forwarders send success or process success in Fallbacker, the dispatcher would also ack the
+//    batch is a success.
+
+// ============================================================================================
+//
+// There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
+// preparing phase, running phase, and closing phase. In the running phase, each plugin has
+// its own interface definition. However, the other three phases have to be defined uniformly.
+
+type Plugin interface {
+	// Description returns the description of the specific plugin.
+	Description() string
+	// Init initialize the specific plugin.
+	InitPlugin(config map[string]interface{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
new file mode 100644
index 0000000..4731d36
--- /dev/null
+++ b/internal/pkg/plugin/registry.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"fmt"
+	"reflect"
+	"sync"
+)
+
+// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
+// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
+// used as key in pluginRegistry.
+
+// reg is the global plugin registry
+var (
+	reg  map[reflect.Type]map[string]reflect.Value
+	lock sync.Mutex
+)
+
+func init() {
+	reg = make(map[reflect.Type]map[string]reflect.Value)
+}
+
+// Add new plugin category. The different plugin category could have same plugin names.
+func AddPluginCategory(pluginCategory reflect.Type) {
+	lock.Lock()
+	defer lock.Unlock()
+	reg[pluginCategory] = map[string]reflect.Value{}
+}
+
+// RegisterPlugin registers the pluginType as plugin.
+// If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
+func RegisterPlugin(pluginName string, plugin interface{}) {
+	lock.Lock()
+	defer lock.Unlock()
+	v := reflect.ValueOf(plugin)
+	success := false
+	for pCategory, pReg := range reg {
+		if v.Type().Implements(pCategory) {
+			pReg[pluginName] = v
+			fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+			success = true
+		}
+	}
+	if !success {
+		fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+	}
+}
+
+// Get the specific plugin according to the pluginCategory and pluginName.
+func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
+	value, ok := reg[pluginCategory][pluginName]
+	if !ok {
+		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+	}
+	t := value.Type()
+	if t.Kind() == reflect.Ptr {
+		t = t.Elem()
+	}
+	plugin := reflect.New(t).Interface().(Plugin)
+	plugin.InitPlugin(config)
+	return plugin
+}
diff --git a/internal/satellite/event/event.go b/internal/satellite/event/event.go
new file mode 100644
index 0000000..a53720b
--- /dev/null
+++ b/internal/satellite/event/event.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Event defines the common fields.
+type Event struct {
+	name      string
+	timestamp time.Time
+	meta      map[string]string
+	eventType event.Type
+	remote    bool
+}
+
+// ETBFunc serialize event to bytes.
+type ETBFunc func(event event.SerializableEvent) []byte
+
+// BToFunc deserialize bytes to event.
+type BToFunc func(bytes []byte) event.SerializableEvent
+
+// StructuredEvent works when the data is a struct type.
+type StructuredEvent struct {
+	Event
+	data interface{}
+}
+
+// StructuredEvent works when the data is not a struct type.
+type UnstructuredEvent struct {
+	Event
+	data map[string]interface{}
+}
+
+// StructuredEvent works when the data is a struct type in the collector.
+type StructuredSerializableEvent struct {
+	StructuredEvent
+	etb ETBFunc
+	bte BToFunc
+}
+
+// StructuredEvent works when the data is not a struct type in the collector.
+type UnstructuredSerializableEvent struct {
+	UnstructuredEvent
+	etb ETBFunc
+	bte BToFunc
+}
+
+func (s *Event) Name() string {
+	return s.name
+}
+
+func (s *Event) Meta() map[string]string {
+	return s.meta
+}
+
+func (s *Event) Time() time.Time {
+	return s.timestamp
+}
+
+func (s *Event) Type() event.Type {
+	return s.eventType
+}
+
+func (s *Event) IsRemote() bool {
+	return s.remote
+}
+
+func (u *StructuredEvent) Data() interface{} {
+	return u.data
+}
+
+func (u *UnstructuredEvent) Data() interface{} {
+	return u.data
+}
+
+func (s *StructuredSerializableEvent) ToBytes() []byte {
+	return s.etb(s)
+}
+
+func (s *StructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
+	return s.bte(bytes)
+}
+
+func (u *UnstructuredSerializableEvent) ToBytes() []byte {
+	return u.etb(u)
+}
+
+func (u *UnstructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
+	return u.bte(bytes)
+}
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
new file mode 100644
index 0000000..51d2b65
--- /dev/null
+++ b/plugins/client/api/client.go
@@ -0,0 +1,47 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
+type Client interface {
+	plugin.Plugin
+
+	// Prepare would make connection with outer service.
+	Prepare()
+	// GetConnection returns the connected client to publish events.
+	GetConnectedClient() interface{}
+	// Close the connection with outer service.
+	Close()
+}
+
+var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+
+// Get client plugin.
+func GetClient(pluginName string, config map[string]interface{}) Client {
+	return plugin.Get(ClientCategory, pluginName, config).(Client)
+}
+
+func init() {
+	plugin.AddPluginCategory(ClientCategory)
+}
diff --git a/plugins/client/example/client.go b/plugins/client/example/client.go
new file mode 100644
index 0000000..77b29c7
--- /dev/null
+++ b/plugins/client/example/client.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+type demoClient struct {
+	a string
+}
+type demoClient2 struct {
+	a string
+}
+type demoClient3 struct {
+	a string
+}
+
+func (d demoClient) Description() string {
+	panic("implement me")
+}
+
+func (d demoClient) InitPlugin(config map[string]interface{}) {
+
+}
+
+func (d demoClient) Prepare() {
+	panic("implement me")
+}
+
+func (d demoClient) GetConnectedClient() interface{} {
+	panic("implement me")
+}
+
+func (d demoClient) Close() {
+	panic("implement me")
+}
+
+func (d *demoClient2) Description() string {
+	panic("implement me")
+}
+
+func (d *demoClient2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoClient2) Prepare() {
+	panic("implement me")
+}
+
+func (d *demoClient2) GetConnectedClient() interface{} {
+	panic("implement me")
+}
+
+func (d *demoClient2) Close() {
+	panic("implement me")
+}
diff --git a/plugins/client/example/client_test.go b/plugins/client/example/client_test.go
new file mode 100644
index 0000000..a9e355e
--- /dev/null
+++ b/plugins/client/example/client_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoClient",
+			args: demoClient{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoClient2",
+			args: &demoClient2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoClient3",
+			args: demoClient3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetClient(name, config)
+}
diff --git a/plugins/client/grpc/README.md b/plugins/client/grpc/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/client/grpc/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/client/kakka/README.md b/plugins/client/kakka/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/client/kakka/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
new file mode 100644
index 0000000..374d63a
--- /dev/null
+++ b/plugins/collector/api/collector.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+//   Init()     Initial stage: Init plugin by config
+//    ||
+//    \/
+//   Prepare()   Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
+//    ||
+//    \/
+//   Next()     Running stage: When Collector collect a data, the data would be fetched by the upstream
+//    ||                       component through this method.
+//    \/
+//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
+
+// Collector is a plugin interface, that defines new collectors.
+type Collector interface {
+	plugin.Plugin
+
+	// Prepare creates a listen or reader to gather data.
+	Prepare()
+	// Next return the data from the input.
+	Next() (event.SerializableEvent, error)
+	// Close would close collector.
+	Close()
+}
+
+var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
+
+// Get collector plugin.
+func GetCollector(pluginName string, config map[string]interface{}) Collector {
+	return plugin.Get(CollectorCategory, pluginName, config).(Collector)
+}
+
+func init() {
+	plugin.AddPluginCategory(CollectorCategory)
+}
diff --git a/plugins/collector/example/collector.go b/plugins/collector/example/collector.go
new file mode 100644
index 0000000..40b9b8b
--- /dev/null
+++ b/plugins/collector/example/collector.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoCollector struct {
+	a string
+}
+
+type demoCollector2 struct {
+	a string
+}
+
+type demoCollector3 struct {
+	a string
+}
+
+func (d *demoCollector) Description() string {
+	panic("implement me")
+}
+
+func (d *demoCollector) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoCollector) Prepare() {
+	panic("implement me")
+}
+
+func (d *demoCollector) Next() (event.SerializableEvent, error) {
+	panic("implement me")
+}
+
+func (d *demoCollector) Close() {
+	panic("implement me")
+}
+
+func (d demoCollector2) Description() string {
+	panic("implement me")
+}
+
+func (d demoCollector2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoCollector2) Prepare() {
+	panic("implement me")
+}
+
+func (d demoCollector2) Next() (event.SerializableEvent, error) {
+	panic("implement me")
+}
+
+func (d demoCollector2) Close() {
+	panic("implement me")
+}
diff --git a/plugins/collector/example/collector_test.go b/plugins/collector/example/collector_test.go
new file mode 100644
index 0000000..e73f3bd
--- /dev/null
+++ b/plugins/collector/example/collector_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/collector/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoCollector",
+			args: &demoCollector{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoCollector2",
+			args: demoCollector2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoCollector3",
+			args: demoCollector3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetCollector(name, config)
+}
diff --git a/plugins/collector/log-grpc/README.md b/plugins/collector/log-grpc/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/collector/log-grpc/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
new file mode 100644
index 0000000..bde56ef
--- /dev/null
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -0,0 +1,44 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Fallbacker is a plugin interface, that defines some fallback strategies.
+type Fallbacker interface {
+	plugin.Plugin
+
+	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
+	FallBack(batch event.BatchEvents) Fallbacker
+}
+
+var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
+
+// Get Fallbacker plugin.
+func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
+	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+}
+
+func init() {
+	plugin.AddPluginCategory(FallbackerCategory)
+}
diff --git a/plugins/fallbacker/example/fallbacker.go b/plugins/fallbacker/example/fallbacker.go
new file mode 100644
index 0000000..b933e96
--- /dev/null
+++ b/plugins/fallbacker/example/fallbacker.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+)
+
+type demoFallbacker struct {
+	a string
+}
+
+type demoFallbacker2 struct {
+	a string
+}
+
+type demoFallbacker3 struct {
+	a string
+}
+
+func (d *demoFallbacker) Description() string {
+	panic("implement me")
+}
+
+func (d *demoFallbacker) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoFallbacker) FallBack(batch event.BatchEvents) api.Fallbacker {
+	panic("implement me")
+}
+
+func (d demoFallbacker2) Description() string {
+	panic("implement me")
+}
+
+func (d demoFallbacker2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoFallbacker2) FallBack(batch event.BatchEvents) api.Fallbacker {
+	panic("implement me")
+}
diff --git a/plugins/fallbacker/example/fallbacker_test.go b/plugins/fallbacker/example/fallbacker_test.go
new file mode 100644
index 0000000..f5f445f
--- /dev/null
+++ b/plugins/fallbacker/example/fallbacker_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoFallbacker",
+			args: &demoFallbacker{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoFallbacker2",
+			args: demoFallbacker2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoFallbacker3",
+			args: demoFallbacker3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetFallbacker(name, config)
+}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
new file mode 100644
index 0000000..0ed43db
--- /dev/null
+++ b/plugins/filter/api/filter.go
@@ -0,0 +1,50 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+//   Init()        Initiating stage: Init plugin by config
+//    ||
+//    \/
+//   Process()     Running stage:    Process the input event to convert to new event. During the processing,
+//                                   the method should also tag event type to mark the event category.
+
+// Filter is a plugin interface, that defines new pipeline filters.
+type Filter interface {
+	plugin.Plugin
+
+	// Process produces a new event by processing incoming event.
+	Process(in event.Event) event.Event
+}
+
+var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
+
+// Get filter plugin.
+func GetFilter(pluginName string, config map[string]interface{}) Filter {
+	return plugin.Get(FilterCategory, pluginName, config).(Filter)
+}
+
+func init() {
+	plugin.AddPluginCategory(FilterCategory)
+}
diff --git a/plugins/filter/example/filter.go b/plugins/filter/example/filter.go
new file mode 100644
index 0000000..14abb69
--- /dev/null
+++ b/plugins/filter/example/filter.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoFilter struct {
+	a string
+}
+
+type demoFilter2 struct {
+	a string
+}
+
+type demoFilter3 struct {
+	a string
+}
+
+func (d *demoFilter) Description() string {
+	panic("implement me")
+}
+
+func (d *demoFilter) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoFilter) Process(in event.Event) event.Event {
+	panic("implement me")
+}
+
+func (d demoFilter2) Description() string {
+	panic("implement me")
+}
+
+func (d demoFilter2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoFilter2) Process(in event.Event) event.Event {
+	panic("implement me")
+}
diff --git a/plugins/filter/example/filter_test.go b/plugins/filter/example/filter_test.go
new file mode 100644
index 0000000..aae4371
--- /dev/null
+++ b/plugins/filter/example/filter_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoFilter",
+			args: &demoFilter{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoFilter2",
+			args: demoFilter2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoFilter3",
+			args: demoFilter3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetFilter(name, config)
+}
diff --git a/plugins/filter/sampling/README.md b/plugins/filter/sampling/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/filter/sampling/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
new file mode 100644
index 0000000..9e99f14
--- /dev/null
+++ b/plugins/forwarder/api/forwarder.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+//   Init()     Initiating stage: Init plugin by config
+//    ||
+//    \/
+//   Prepare()   Preparing stage: Prepare the Forwarder, such as get remote client.
+//    ||
+//    \/
+//   Forward()  Running stage: Forward the batch events
+//    ||
+//    \/
+//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
+
+// Forwarder is a plugin interface, that defines new forwarders.
+type Forwarder interface {
+	plugin.Plugin
+
+	// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
+	Forward(batch event.BatchEvents)
+
+	// ForwardType returns the supporting event type that could be forwarded.
+	ForwardType() event.Type
+}
+
+var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
+
+func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
+	return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
+}
+
+func init() {
+	plugin.AddPluginCategory(ForwarderCategory)
+}
diff --git a/plugins/forwarder/example/forwarder.go b/plugins/forwarder/example/forwarder.go
new file mode 100644
index 0000000..637f9e1
--- /dev/null
+++ b/plugins/forwarder/example/forwarder.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoForwarder struct {
+	a string
+}
+
+type demoForwarder2 struct {
+	a string
+}
+
+type demoForwarder3 struct {
+	a string
+}
+
+func (d *demoForwarder) Description() string {
+	panic("implement me")
+}
+
+func (d *demoForwarder) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoForwarder) Forward(batch event.BatchEvents) {
+	panic("implement me")
+}
+
+func (d demoForwarder2) Description() string {
+	panic("implement me")
+}
+
+func (d demoForwarder2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoForwarder2) Forward(batch event.BatchEvents) {
+	panic("implement me")
+}
+
+func (d demoForwarder2) ForwardType() event.Type {
+	panic("implement me")
+}
+
+func (d *demoForwarder) ForwardType() event.Type {
+	panic("implement me")
+}
diff --git a/plugins/forwarder/example/forwarder_test.go b/plugins/forwarder/example/forwarder_test.go
new file mode 100644
index 0000000..28de35d
--- /dev/null
+++ b/plugins/forwarder/example/forwarder_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoForwarder",
+			args: &demoForwarder{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoForwarder2",
+			args: demoForwarder2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoForwarder3",
+			args: demoForwarder3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetForwarder(name, config)
+}
diff --git a/plugins/forwarder/segment/README.md b/plugins/forwarder/segment/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/forwarder/segment/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
new file mode 100644
index 0000000..3ca5ebe
--- /dev/null
+++ b/plugins/parser/api/parser.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+//
+// Collector ==> RawData ==> Parser ==> SerializableEvent
+//
+// Parser is a plugin interface, that defines new Parsers for Collector plugin.
+type Parser interface {
+	plugin.Plugin
+
+	// ParseBytes parse the byte buffer into events.
+	ParseBytes(bytes []byte) ([]event.SerializableEvent, error)
+
+	// ParseStr parse the string into events.
+	ParseStr(str string) ([]event.SerializableEvent, error)
+}
+
+var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
+
+func GetParser(pluginName string, config map[string]interface{}) Parser {
+	return plugin.Get(ParserCategory, pluginName, config).(Parser)
+}
+
+func init() {
+	plugin.AddPluginCategory(ParserCategory)
+}
diff --git a/plugins/parser/example/parser.go b/plugins/parser/example/parser.go
new file mode 100644
index 0000000..839956f
--- /dev/null
+++ b/plugins/parser/example/parser.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import "github.com/apache/skywalking-satellite/internal/pkg/event"
+
+type demoParser struct {
+	a string
+}
+
+type demoParser2 struct {
+	a string
+}
+
+type demoParser3 struct {
+	a string
+}
+
+func (d *demoParser) Description() string {
+	panic("implement me")
+}
+
+func (d *demoParser) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoParser) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
+	panic("implement me")
+}
+
+func (d *demoParser) ParseStr(str string) ([]event.SerializableEvent, error) {
+	panic("implement me")
+}
+
+func (d demoParser2) Description() string {
+	panic("implement me")
+}
+
+func (d demoParser2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoParser2) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
+	panic("implement me")
+}
+
+func (d demoParser2) ParseStr(str string) ([]event.SerializableEvent, error) {
+	panic("implement me")
+}
diff --git a/plugins/parser/example/parser_test.go b/plugins/parser/example/parser_test.go
new file mode 100644
index 0000000..b577f7d
--- /dev/null
+++ b/plugins/parser/example/parser_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/parser/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoParser",
+			args: &demoParser{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoParser2",
+			args: demoParser2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoParser3",
+			args: demoParser3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetParser(name, config)
+}
diff --git a/plugins/parser/gork/README.md b/plugins/parser/gork/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/parser/gork/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
new file mode 100644
index 0000000..9b95156
--- /dev/null
+++ b/plugins/queue/api/queue.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package api
+
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Queue is a plugin interface, that defines new queues.
+type Queue interface {
+	plugin.Plugin
+
+	// Publisher get the only publisher for the current queue.
+	Publisher() QueuePublisher
+
+	// Consumer get the only consumer for the current queue.
+	Consumer() QueueConsumer
+
+	// Close would close the queue.
+	Close()
+}
+
+// QueuePublisher is a plugin interface, that defines new queue publishers.
+type QueuePublisher interface {
+	// Enqueue push a inputEvent into the queue.
+	Enqueue(event *event.SerializableEvent) error
+}
+
+// QueueConsumer is a plugin interface, that defines new queue consumers.
+type QueueConsumer interface {
+	// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
+	Dequeue() (event *event.SerializableEvent, offset int64, err error)
+}
+
+var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
+
+func GetQueue(pluginName string, config map[string]interface{}) Queue {
+	return plugin.Get(QueueCategory, pluginName, config).(Queue)
+}
+
+func init() {
+	plugin.AddPluginCategory(QueueCategory)
+}
diff --git a/plugins/queue/example/queue.go b/plugins/queue/example/queue.go
new file mode 100644
index 0000000..5a3055a
--- /dev/null
+++ b/plugins/queue/example/queue.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type demoQueue struct {
+	a string
+}
+
+type demoQueue2 struct {
+	a string
+}
+
+type demoQueue3 struct {
+	a string
+}
+
+func (d *demoQueue) Description() string {
+	panic("implement me")
+}
+
+func (d *demoQueue) InitPlugin(config map[string]interface{}) {
+}
+
+func (d *demoQueue) Publisher() api.QueuePublisher {
+	panic("implement me")
+}
+
+func (d *demoQueue) Consumer() api.QueueConsumer {
+	panic("implement me")
+}
+
+func (d *demoQueue) Close() {
+	panic("implement me")
+}
+
+func (d demoQueue2) Description() string {
+	panic("implement me")
+}
+
+func (d demoQueue2) InitPlugin(config map[string]interface{}) {
+}
+
+func (d demoQueue2) Publisher() api.QueuePublisher {
+	panic("implement me")
+}
+
+func (d demoQueue2) Consumer() api.QueueConsumer {
+	panic("implement me")
+}
+
+func (d demoQueue2) Close() {
+	panic("implement me")
+}
diff --git a/plugins/queue/example/queue_test.go b/plugins/queue/example/queue_test.go
new file mode 100644
index 0000000..0afd20d
--- /dev/null
+++ b/plugins/queue/example/queue_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package example
+
+import (
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+func Test_Register(t *testing.T) {
+	tests := []struct {
+		name  string
+		args  interface{}
+		panic bool
+	}{
+		{
+			name: "demoQueue",
+			args: &demoQueue{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoQueue2",
+			args: demoQueue2{
+				a: "s",
+			},
+			panic: false,
+		},
+		{
+			name: "demoQueue3",
+			args: demoQueue3{
+				a: "s",
+			},
+			panic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			plugin.RegisterPlugin(tt.name, tt.args)
+			assertPanic(t, tt.name, nil, tt.panic)
+		})
+	}
+}
+
+func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
+	defer func() {
+		if r := recover(); r != nil && !existPanic {
+			t.Errorf("the plugin %s is not pass", name)
+		}
+	}()
+	api.GetQueue(name, config)
+}
diff --git a/plugins/queue/mmap/README.md b/plugins/queue/mmap/README.md
new file mode 100644
index 0000000..3f03ea1
--- /dev/null
+++ b/plugins/queue/mmap/README.md
@@ -0,0 +1 @@
+# Plugin description
\ No newline at end of file