You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/16 05:54:19 UTC

[skywalking-satellite] branch main updated: Add main structure (#8)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new bad4718  Add main structure (#8)
bad4718 is described below

commit bad4718b0431e27f6f670bd067a2dab9d2513118
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Wed Dec 16 13:53:41 2020 +0800

    Add main structure (#8)
    
    * enhance the plugin mechanism
    
    * remove absent build phase
    
    * Aggregate registry & fix some comments
    
    * fix some comments
    
    * fix some comments
    
    * add module
    
    * add licenses
    
    * add build in makefile
    
    * refactoring(add server concept) & polish codes & add docs
    
    * fix log comments
    
    * Remove unclear concepts & polish docs
    
    * rename go file
    
    * polish doc
    
    * polish the chore codes
    
    * fix api bug in sharing plugins
    
    * remove lock in plugins and polish docs
    
    * remove unnecessary license
    
    * polish license file
    
    Co-authored-by: Evan <ev...@outlook.com>
    Co-authored-by: Zhenxu Ke <ke...@apache.org>
---
 Makefile                                           |   9 +
 main.go => cmd/command.go                          |  33 ++-
 main.go => cmd/main.go                             |  22 +-
 configs/{config.yaml => satellite_config.yaml}     |  64 ++---
 dist/LICENSE                                       |   4 +-
 dist/licenses/LICENSE-cli                          |  21 ++
 dist/licenses/LICENSE-viper                        |  21 ++
 docs/design/module_design.md                       | 128 +++++++++
 docs/design/module_structure.md                    |  25 ++
 docs/design/plugin_structure.md                    |  44 +++
 docs/project_structue.md                           | 117 ++++----
 go.mod                                             |   7 +-
 go.sum                                             | 297 +++++++++++++++++++++
 internal/pkg/event/event.go                        |  11 +-
 internal/pkg/log/log.go                            | 146 ++++++++++
 internal/pkg/{logger => log}/log_test.go           |  44 ++-
 internal/pkg/logger/log.go                         | 106 --------
 internal/pkg/plugin/define.go                      |  84 ------
 .../client.go => internal/pkg/plugin/definition.go |  47 ++--
 internal/pkg/plugin/plugin_test.go                 | 101 +++++++
 internal/pkg/plugin/registry.go                    |  72 +++--
 internal/satellite/boot/boot.go                    | 149 +++++++++++
 internal/satellite/config/loader.go                |  69 +++++
 internal/satellite/config/loader_test.go           | 136 ++++++++++
 internal/satellite/config/satellite_config.go      |  48 ++++
 main.go => internal/satellite/module/api/module.go |  24 +-
 internal/satellite/module/buffer/buffer.go         |  81 ++++++
 .../satellite/module/buffer/buffer_test.go         |  62 +++--
 .../satellite/module/gatherer/api/config.go        |  33 +--
 .../satellite/module/gatherer/api/gatherer.go      |  17 +-
 internal/satellite/module/gatherer/create.go       |  61 +++++
 .../satellite/module/gatherer/fetcher_gatherer.go  |  89 ++++++
 .../satellite/module/gatherer/receiver_gatherer.go |  92 +++++++
 .../satellite/module/processor/api/config.go       |  14 +-
 .../satellite/module/processor/api/processor.go    |   9 +-
 internal/satellite/module/processor/create.go      |  41 +++
 internal/satellite/module/processor/processor.go   |  82 ++++++
 .../satellite/module/sender/api/config.go          |  32 +--
 .../satellite/module/sender/api/sender.go          |  14 +-
 internal/satellite/module/sender/create.go         |  51 ++++
 internal/satellite/module/sender/sender.go         | 163 +++++++++++
 internal/satellite/sharing/sharing_plugins.go      |  67 +++++
 plugins/client/api/client.go                       |  35 ++-
 main.go => plugins/client/api/client_repository.go |  24 +-
 plugins/collector/api/collector.go                 |  59 ----
 plugins/fallbacker/api/fallbacker.go               |  17 +-
 .../api/fallbacker_repository.go}                  |  33 +--
 plugins/fallbacker/timer/timer_fallbacker.go       |  64 +++++
 main.go => plugins/fetcher/api/fetcher.go          |  15 +-
 .../api/fetcher_repository.go}                     |  31 +--
 plugins/filter/api/filter.go                       |  23 +-
 .../client.go => filter/api/filter_repository.go}  |  31 +--
 plugins/forwarder/api/forwarder.go                 |  30 +--
 .../api/forwarder_repository.go}                   |  31 +--
 plugins/init.go                                    |  45 ++++
 plugins/parser/api/parser.go                       |  15 --
 .../client.go => parser/api/parser_repository.go}  |  31 +--
 plugins/queue/api/queue.go                         |  40 +--
 main.go => plugins/queue/api/queue_repository.go   |  24 +-
 .../api/fallbacker.go => receiver/api/receiver.go} |  23 +-
 .../api/receiver_repository.go}                    |  31 +--
 plugins/{collector => receiver}/log-grpc/README.md |   0
 main.go => plugins/server/api/server.go            |  12 +-
 main.go => plugins/server/api/server_repository.go |  24 +-
 64 files changed, 2644 insertions(+), 731 deletions(-)

diff --git a/Makefile b/Makefile
index 7e0cf4a..dbbc0e8 100644
--- a/Makefile
+++ b/Makefile
@@ -70,3 +70,12 @@ verify: clean license lint test
 .PHONY: clean
 clean: tools
 	-rm -rf coverage.txt
+
+.PHONY: build
+build: deps windows linux darwin
+
+
+.PHONY: $(PLATFORMS)
+$(PLATFORMS):
+	mkdir -p $(OUT_DIR)
+	GOOS=$(os) GOARCH=$(ARCH) $(GO_BUILD) $(GO_BUILD_FLAGS) -ldflags "$(GO_BUILD_LDFLAGS)" -o $(OUT_DIR)/$(BINARY)-$(VERSION)-$(os)-$(ARCH) ./cmd
diff --git a/main.go b/cmd/command.go
similarity index 54%
copy from main.go
copy to cmd/command.go
index 94fbc40..0fe3c18 100644
--- a/main.go
+++ b/cmd/command.go
@@ -17,6 +17,35 @@
 
 package main
 
-func main() {
-	print("OK")
+import (
+	"github.com/urfave/cli/v2"
+
+	"github.com/apache/skywalking-satellite/internal/satellite/boot"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+var (
+	cmdStart = cli.Command{
+		Name:  "start",
+		Usage: "start satellite",
+		Flags: []cli.Flag{
+			&cli.StringFlag{
+				Name:    "config, c",
+				Usage:   "Load configuration from `FILE`",
+				EnvVars: []string{"SATELLITE_CONFIG"},
+				Value:   "configs/satellite_config.yaml",
+			},
+		},
+		Action: func(c *cli.Context) error {
+
+			cfg := loadConfig(c)
+			return boot.Start(cfg)
+		},
+	}
+)
+
+func loadConfig(c *cli.Context) *config.SatelliteConfig {
+	configPath := c.String("config")
+	cfg := config.Load(configPath)
+	return cfg
 }
diff --git a/main.go b/cmd/main.go
similarity index 61%
copy from main.go
copy to cmd/main.go
index 94fbc40..0b51788 100644
--- a/main.go
+++ b/cmd/main.go
@@ -17,6 +17,26 @@
 
 package main
 
+import (
+	"os"
+	"time"
+
+	"github.com/urfave/cli/v2"
+)
+
+// version will be initialized when building
+var version string = "lastest"
+
 func main() {
-	print("OK")
+	app := cli.NewApp()
+	app.Name = "SkyWalking-Satellite"
+	app.Version = version
+	app.Compiled = time.Now()
+	app.Usage = "Satellite is for collecting APM data."
+	app.Description = "A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs."
+	app.Commands = []*cli.Command{
+		&cmdStart,
+	}
+	app.Action = cli.ShowAppHelp
+	_ = app.Run(os.Args)
 }
diff --git a/configs/config.yaml b/configs/satellite_config.yaml
similarity index 52%
rename from configs/config.yaml
rename to configs/satellite_config.yaml
index 232aecb..3c71121 100644
--- a/configs/config.yaml
+++ b/configs/satellite_config.yaml
@@ -15,34 +15,38 @@
 # limitations under the License.
 #
 
-Gatherer:
-  - name: segment-receiver
-    type: segment-receiver
-    config:
-      key: value
-      key2: value2
-    queue:
-      type: mmap-queue
-      config:
+logger:
+  log_pattern: "%time [%level][%field] - %msg"
+  time_pattern: "2006-01-02 15:04:05.001"
+  level: "info"
+
+sharing:
+  clients:
+    - plugin_name: "grpc-client"
+      k: v
+  servers:
+    - plugin_name: "grpc-server"
+      k: v
+namespaces:
+  - common_config:
+      name: namespace1
+    gatherer:
+      receiver:
+        plugin_name: segment-receiver
+        server_name: "grpc-server"
+        k: v
+      queue:
+        plugin_name: mmap-queue
         key: value
-        key2: value2
-    processor: processor1
-Processor:
-  - name: processor1
-    filters:
-      - filtername1
-      - filtername2
-      - filtername3
-Sender:
-  client:
-    name: gRPC-client
-    type: gRPC
-    config:
-      key1: value1
-      key2: value2
-  forwarders:
-    - type: segment-forwarder
-      eventType: segment
-      config:
-        key1: value1
-        key2: value2
\ No newline at end of file
+    processor:
+      filters:
+        - plugin_name: filtertype1
+          key: value
+    sender:
+      flush_time: 200
+      max_buffer_size: 100
+      min_flush_events: 30
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: segment-forwarder
+          key: value
diff --git a/dist/LICENSE b/dist/LICENSE
index 2ab0d2f..618fe55 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -231,4 +231,6 @@ MIT licenses
 The following components are provided under the MIT License. See project link for details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
-	sirupsen (logrus) 1.7.0: https://github.com/sirupsen/logrus MIT
+	sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
+	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
+	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
diff --git a/dist/licenses/LICENSE-cli b/dist/licenses/LICENSE-cli
new file mode 100644
index 0000000..42a597e
--- /dev/null
+++ b/dist/licenses/LICENSE-cli
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Jeremy Saenz & Contributors
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/dist/licenses/LICENSE-viper b/dist/licenses/LICENSE-viper
new file mode 100644
index 0000000..27457cd
--- /dev/null
+++ b/dist/licenses/LICENSE-viper
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Steve Francia
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docs/design/module_design.md b/docs/design/module_design.md
new file mode 100644
index 0000000..9782973
--- /dev/null
+++ b/docs/design/module_design.md
@@ -0,0 +1,128 @@
+# Module Design
+## Namespace
+The namespace is an isolation concept in Satellite. 
+Each namespace has one pipeline to process the telemetry data(metrics/traces/logs). Two namespaces are not sharing data.
+
+```
+                            Satellite
+ ---------------------------------------------------------------------
+|            -------------------------------------------              |
+|           |                 Namespace                 |             |
+|            -------------------------------------------              |
+|            -------------------------------------------              |
+|           |                 Namespace                 |             |
+|            -------------------------------------------              |
+|            -------------------------------------------              |
+|           |                 Namespace                 |             |
+|            -------------------------------------------              |
+ ---------------------------------------------------------------------
+```
+## Modules
+There are 3 modules in one namespace, which are Gatherer, Processor, and Sender.
+
+- The Gatherer module is responsible for fetching or receiving data and pushing the data to Queue. So there are 2 kinds of Gatherer, which are ReceiverGatherer and FetcherGatherer.
+- The Processor module is responsible for reading data from the queue and processing data by a series of filter chains.
+- The Sender module is responsible for async processing and forwarding the data to the external services in the batch mode. After sending success, Sender would also acknowledge the offset of Queue in Gatherer.
+
+```
+                            Namespace
+ --------------------------------------------------------------------
+|            ----------      -----------      --------               |
+|           | Gatherer | => | Processor | => | Sender |              |                          
+|            ----------      -----------      --------               |
+ --------------------------------------------------------------------
+```
+
+## Plugins
+
+Plugin is the minimal components in the module. Sateliite has 2 plugin catalogs, which are sharing plugins and normal plugins.
+
+- a sharing plugin instance could be sharing with multiple modules in the different namespaces.
+- a normal plugin instance is only be used in a fixed module of the fixed namespaces.
+
+### Sharing plugin
+Nowadays, there are 2 kinds of sharing plugins in Satellite, which are server plugins and client plugins. The reason why they are sharing plugins is to reduce the resource cost in connection. Server plugins are sharing with the ReceiverGatherer modules in the different namespaces to receive the external requests. And the client plugins is sharing with the Sender modules in the different namespaces to connect with external services, such as Kafka and OAP.
+
+```
+           Sharing Server                      Sharing Client
+ --------------------------------------------------------------------
+|       ------------------      -----------      --------            |
+|      | ReceiverGatherer | => | Processor | => | Sender |           |                          
+|       ------------------      -----------      --------            |
+ --------------------------------------------------------------------
+ --------------------------------------------------------------------
+|       ------------------      -----------      --------            |
+|      | ReceiverGatherer | => | Processor | => | Sender |           |                          
+|       ------------------      -----------      --------            |
+ --------------------------------------------------------------------
+ --------------------------------------------------------------------
+|       ------------------      -----------      --------            |
+|      | ReceiverGatherer | => | Processor | => | Sender |           |                          
+|       ------------------      -----------      --------            |
+ --------------------------------------------------------------------
+```
+
+### Normal plugin
+There are 7 kinds of normal plugins in Satellite, which are Receiver, Fetcher, Queue, Parser, Filter, Forwarder, and Fallbacker.
+
+- Receiver: receives the input APM data from the request.
+- Fetcher: fetch the APM data by fetching.
+- Queue: store the APM data to ensure the data stability.
+- Parser: supports some ways to parse data, such parse a csv file.
+- Filter: processes the APM data.
+- Forwarder: forwards the APM data to the external receiver, such as Kafka and OAP.
+- Fallbacker: supports some fallback strategies, such as timer retry strategy.
+
+```
+
+                   Gatherer                                Processor
+       -------------------------------      -------------------------------------------
+      |  -----------       ---------   |   |  -----------                 -----------  |
+      | | Receiver  | ==> |  Queue   | |==>| |  Filter   | ==>  ...  ==> |  Filter   | |
+      | | /Fetcher  |     | Mem/File | |   |  -----------                 -----------  |
+      |  -----------       ----------  |   |      ||                          ||       |
+       --------------------------------    |      \/	                      \/       |
+                                           |  ---------------------------------------  |
+                                           | |             OutputEventContext        | |
+                                           |  ---------------------------------------  |
+                                            -------------------------------------------     
+                                             ||                                      
+                                             \/              Sender                  
+                                             ------------------------------------------
+                                            |  ---       ---                           |
+                                            | | B |     | D |     -----------------    |
+                                            | | A |     | I |    |Segment Forwarder|   |
+                                            | | T |     | S |    |    (Fallbacker) |   |
+                                            | | C |     | P |     -----------------    |
+                                            | | H |  => | A |                          | ===> Kakfa/OAP
+                                            | | B |     | T | =>        ......         |
+                                            | | U |     | C |                          |
+                                            | | F |     | H |     -----------------    |
+                                            | | F |     | E |    | Meter  Forwarder|   |
+                                            | | E |     | R |    |     (Fallbacker |   |
+                                            | | R |     |   |     -----------------    |
+                                            |  ---       ---                           |
+                                             ------------------------------------------
+
+
+ 1. The Fetcher/Receiver plugin would fetch or receive the input data.
+ 2. The Parser plugin would parse the input data to SerializableEvent that is supported
+    to be stored in Queue.
+ 3. The Queue plugin stores the SerializableEvent. However, whether serializing depends on
+    the Queue implements. For example, the serialization is unnecessary when using a Memory
+    Queue. Once an event is pulled by the consumer of Queue, the event will be processed by
+    the filters in Processor.
+ 4. The Filter plugin would process the event to create a new event. Next, the event is passed
+    to the next filter to do the same things until the whole filters are performed. All created
+    events would be stored in the OutputEventContext. However, only the events labeled with
+    RemoteEvent type would be forwarded by Forwarder.
+ 5. After processing, the events in OutputEventContext would be stored in the BatchBuffer. When
+    the timer is triggered or the capacity limit is reached, the events in BatchBuffer would be
+    partitioned by EventType and sent to the different Forwarders, such as Segment Forwarder and
+    Meter Forwarder.
+ 6. The Follower in different Senders would share with the remote client to avoid make duplicate
+    connections and have the same Fallbacker(FallBack strategy) to process data. When all
+    forwarders send success or process success in Fallbacker, the dispatcher would also ack the
+    batch is a success.
+ ============================================================================================
+```
\ No newline at end of file
diff --git a/docs/design/module_structure.md b/docs/design/module_structure.md
new file mode 100644
index 0000000..54899aa
--- /dev/null
+++ b/docs/design/module_structure.md
@@ -0,0 +1,25 @@
+# Module structure
+
+## Overview
+Module is the core workers in Satellite. Module is constituted by the specific extension plugins.
+There are 3 modules in one namespace, which are Gatherer, Processor, and Sender.
+
+- The Gatherer module is responsible for fetching or receiving data and pushing the data to Queue. So there are 2 kinds of Gatherer, which are ReceiverGatherer and FetcherGatherer.
+- The Processor module is responsible for reading data from the queue and processing data by a series of filter chains.
+- The Sender module is responsible for async processing and forwarding the data to the external services in the batch mode. After sending success, Sender would also acknowledge the offset of Queue in Gatherer.
+
+```
+                            Namespace
+ --------------------------------------------------------------------
+|            ----------      -----------      --------               |
+|           | Gatherer | => | Processor | => | Sender |              |                          
+|            ----------      -----------      --------               |
+ --------------------------------------------------------------------
+```
+
+## LifeCycle
+
+- Prepare: Prepare phase is to do some preparation works, such as register the client status listener to the client in ReceiverGatherer.
+- Boot: Boot phase is to start the current module until receives a close signal.
+- ShutDown: ShutDown phase is to close the used resources.
+
diff --git a/docs/design/plugin_structure.md b/docs/design/plugin_structure.md
new file mode 100644
index 0000000..060a1a8
--- /dev/null
+++ b/docs/design/plugin_structure.md
@@ -0,0 +1,44 @@
+# plugin structure
+`Plugin is a common concept for Satellite, which is in all externsion plugins.`
+
+## Registration mechanism
+
+The Plugin registration mechanism in Satellite is similar to the SPI registration mechanism of Java. 
+Plugin registration mechanism supports to register an interface and its implementation, that means different interfaces have different registration spaces.
+We can easily find the type of a specific plugin according to the interface and the plugin name and initialize it according to the type.
+
+structure:
+- code: `map[reflect.Type]map[string]reflect.Value`
+- meaning: `map[interface type]map[plugin name] plugin type`
+
+
+## Initialization mechanism
+
+Users can easily find a plugin type and initialize an empty plugin instance according to the previous registration mechanism. For setting up the configuration of the extension convenience, we define the initialization mechanism in Plugin structure.
+
+In the initialization mechanism, `the plugin category(interface)` and `the init config is required`.
+ 
+Initialize processing is like the following.
+
+1. Find the plugin name in the input config according to the fixed key `plugin_name`.
+2. Find plugin type according to the plugin category(interface) and the plugin name.
+3. Create an empty plugin.
+4. Initialize the plugin according to the merged config, which is created by the input config and the default config.
+
+
+
+## Plugin usage in Satellite
+Nowadays, the numbers of the Plugin categories is 2. One is the [sharing Plugin](module_design.md), and another is the other [normal Plugin](module_design.md).
+
+- Extension Plugins: 
+    - sharing plugins
+        - Server Plugin
+        - Client Plugin
+    - normal plugins
+        - Receiver Plugin
+        - Fetcher Plugin
+        - Parser Plugin
+        - Queue Plugin
+        - Filter Plugin
+        - Fallbacker Plugin
+        - Forwarder Plugin
diff --git a/docs/project_structue.md b/docs/project_structue.md
index 4a51ad8..8aefe8a 100644
--- a/docs/project_structue.md
+++ b/docs/project_structue.md
@@ -4,60 +4,79 @@
 - internal/pkg: Sharing with Core and Plugins, such as api and utils.
 - internal/satellite: The core of Satellite.
 - plugins: Contains all plugins.
-- plugins/{type}: Contains the plugins of this {type}. Satellite has 6 plugin types, which are collector, queue, parser, filter, client, and forward.
-- plugins/api: Contains the plugin definition.
+- plugins/{type}: Contains the plugins of this {type}. Satellite has 9 plugin types.
+- plugins/api: Contains the plugin definition and initlizer.
 - plugins/{type}/{plugin-name}: Contains the specific plugin, and {plugin-name}-{type} would be registered as the plugin unique name in the registry. 
-
-
 ```
 .
+├── cmd
+│   ├── command.go
+│   └── main.go
 ├── configs
-│   └── config.yaml
+│   └── satellite_config.yaml
+├── docs
+│   ├── design
+│   │   ├── module_design.md
+│   │   ├── module_structure.md
+│   │   └── plugin_structure.md
+│   └── project_structue.md
 ├── internal
+│   ├── container
 │   ├── pkg
-│   │   ├── api
-│   │   │   ├── client.go
-│   │   │   ├── collector.go
-│   │   │   ├── event.go
-│   │   │   ├── filter.go
-│   │   │   ├── forwarder.go
-│   │   │   ├── parser.go
-│   │   │   ├── plugin.go
-│   │   │   └── queue.go
-│   │   └── ...
+│   │   ├── event
+│   │   ├── log
+│   │   ├── plugin
+│   │   │   ├── definition.go
+│   │   │   ├── plugin_test.go
+│   │   │   └── registry.go
 │   └── satellite
-│       ├── registry
-│       │   └── registry.go
-│       └── ...
-├── plugins
-│   ├── client
-│   │   ├── api
-│   │   │   └── client.go
-│   │   ├── grpc
-│   │   └── kakka
-│   ├── collector
-│   │   ├── api
-│   │   │   └── collector.go
-│   │   ├── example
-│   │   └── log-grpc
-│   │       └── README.md
-│   ├── fallbacker
-│   │   ├── api
-│   │   │   └── fallbacker.go
-│   ├── filter
-│   │   ├── api
-│   │   │   └── filter.go
-│   ├── forwarder
-│   │   ├── api
-│   │   │   └── forwarder.go
-│   ├── parser
-│   │   ├── api
-│   │   │   └── parser.go
-│   │   └── gork
-│   │       └── README.md
-│   └── queue
-│       ├── api
-│       │   └── queue.go
-│       └── mmap
-│           └── README.md
+│       ├── boot
+│       │   └── boot.go
+│       ├── config
+│       ├── event
+│       ├── module
+│       │   ├── api
+│       │   ├── buffer
+│       │   ├── gatherer
+│       │   ├── processor
+│       │   └── sender
+│       └── sharing
+└── plugins
+    ├── client
+    │   ├── api
+    │   │   ├── client.go
+    │   │   └── client_repository.go
+    ├── fallbacker
+    │   ├── api
+    │   │   ├── fallbacker.go
+    │   │   └── fallbacker_repository.go
+    ├── fetcher
+    │   └── api
+    │       ├── fetcher.go
+    │       └── fetcher_repository.go
+    ├── filter
+    │   ├── api
+    │   │   ├── filter.go
+    │   │   └── filter_repository.go
+    ├── forwarder
+    │   ├── api
+    │   │   ├── forwarder.go
+    │   │   └── forwarder_repository.go
+    ├── init.go
+    ├── parser
+    │   ├── api
+    │   │   ├── parser.go
+    │   │   └── parser_repository.go
+    ├── queue
+    │   ├── api
+    │   │   ├── queue.go
+    │   │   └── queue_repository.go
+    ├── receiver
+    │   ├── api
+    │   │   ├── receiver.go
+    │   │   └── receiver_repository.go
+    └── server
+        └── api
+            ├── server.go
+            └── server_repository.go
 ```
diff --git a/go.mod b/go.mod
index 71fd69d..3bf717f 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,9 @@ module github.com/apache/skywalking-satellite
 
 go 1.14
 
-require github.com/sirupsen/logrus v1.7.0
+require (
+	github.com/sirupsen/logrus v1.7.0
+	github.com/spf13/viper v1.7.1
+	github.com/urfave/cli/v2 v2.3.0
+)
+
diff --git a/go.sum b/go.sum
index 4d74a1e..e84cc08 100644
--- a/go.sum
+++ b/go.sum
@@ -1,10 +1,307 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
+cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
+cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
+cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
+cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
+cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
+cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
+cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
+dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
+github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
+github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
+github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
+github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
+github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
+github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
+github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
+github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
+github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
+github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
+github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
+github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
+github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
+github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
+github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
+github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
+github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
+github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
+github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
+github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
+github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
+github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
+github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
+github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
+github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
+github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
+github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
+github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
+github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
+github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
+github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
+github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
+github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
+github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
+github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
+github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
+github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
+go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
+golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
+golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
+golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
+golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
+golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
+golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
+google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
+google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
+gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
+rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index c8ff0f2..a834fae 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -36,6 +36,9 @@ const (
 
 type Type int32
 
+// Offset is a generic form, which allows having different definitions in different Queues.
+type Offset string
+
 // Event that implement this interface would be allowed to transmit in the Satellite.
 type Event interface {
 	// Name returns the event name.
@@ -73,20 +76,20 @@ type BatchEvents []Event
 
 // OutputEventContext is a container to store the output context.
 type OutputEventContext struct {
-	context map[string]Event
-	Offset  int64
+	Context map[string]Event
+	Offset  Offset
 }
 
 // Put puts the incoming event into the context when the event is a remote event.
 func (c *OutputEventContext) Put(event Event) {
 	if event.IsRemote() {
-		c.context[event.Name()] = event
+		c.Context[event.Name()] = event
 	}
 }
 
 // Get returns a event in the context. When the eventName does not exist, a error would be returned.
 func (c *OutputEventContext) Get(eventName string) (Event, error) {
-	e, ok := c.context[eventName]
+	e, ok := c.Context[eventName]
 	if !ok {
 		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
 		return nil, err
diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go
new file mode 100644
index 0000000..6040c2f
--- /dev/null
+++ b/internal/pkg/log/log.go
@@ -0,0 +1,146 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package log
+
+import (
+	"fmt"
+	"os"
+	"strings"
+	"sync"
+
+	"github.com/sirupsen/logrus"
+)
+
+// Default logger config.
+const (
+	defaultLogPattern  = "%time [%level][%field] - %msg"
+	defaultTimePattern = "2006-01-02 15:04:05.001"
+)
+
+// LoggerConfig initializes the global logger config.
+type LoggerConfig struct {
+	LogPattern  string `mapstructure:"log_pattern"`
+	TimePattern string `mapstructure:"time_pattern"`
+	Level       string `mapstructure:"level"`
+}
+
+// FormatOption is a function to set formatter config.
+type FormatOption func(f *formatter)
+
+// FormatOption is a function to set logger config.
+type ConfigOption func(l *logrus.Logger)
+
+type formatter struct {
+	logPattern  string
+	timePattern string
+}
+
+// Logger is the global logger.
+var Logger *logrus.Logger
+var once sync.Once
+
+func Init(cfg *LoggerConfig) {
+	once.Do(func() {
+		var configOpts []ConfigOption
+		var formatOpts []FormatOption
+
+		if cfg.Level != "" {
+			configOpts = append(configOpts, SetLevel(cfg.Level))
+		} else {
+			configOpts = append(configOpts, SetLevel(logrus.InfoLevel.String()))
+		}
+		if cfg.TimePattern != "" {
+			formatOpts = append(formatOpts, SetTimePattern(cfg.TimePattern))
+		} else {
+			formatOpts = append(formatOpts, SetTimePattern(defaultTimePattern))
+		}
+		if cfg.LogPattern != "" {
+			formatOpts = append(formatOpts, SetLogPattern(cfg.LogPattern))
+		} else {
+			formatOpts = append(formatOpts, SetLogPattern(defaultLogPattern))
+		}
+		initBySettings(configOpts, formatOpts)
+	})
+}
+
+// The Logger init method, keep Logger as a singleton.
+func initBySettings(configOpts []ConfigOption, formatOpts []FormatOption) {
+	// Default Logger.
+	Logger = logrus.New()
+	Logger.SetOutput(os.Stdout)
+	for _, opt := range configOpts {
+		opt(Logger)
+	}
+	// Default formatter.
+	f := &formatter{}
+	for _, opt := range formatOpts {
+		opt(f)
+	}
+	if !strings.Contains(f.logPattern, "\n") {
+		f.logPattern += "\n"
+	}
+	Logger.SetFormatter(f)
+}
+
+// Put the log pattern in formatter.
+func SetLogPattern(logPattern string) FormatOption {
+	return func(f *formatter) {
+		f.logPattern = logPattern
+	}
+}
+
+// Put the time pattern in formatter.
+func SetTimePattern(timePattern string) FormatOption {
+	return func(f *formatter) {
+		f.timePattern = timePattern
+	}
+}
+
+// Put the time pattern in formatter.
+func SetLevel(levelStr string) ConfigOption {
+	return func(logger *logrus.Logger) {
+		level, err := logrus.ParseLevel(levelStr)
+		if err != nil {
+			fmt.Printf("logger level does not exist: %s, level would be set info", levelStr)
+			level = logrus.InfoLevel
+		}
+		logger.SetLevel(level)
+	}
+}
+
+// Format supports unified log output format that has %time, %level, %field and %msg.
+func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
+	output := f.logPattern
+	output = strings.Replace(output, "%time", entry.Time.Format(f.timePattern), 1)
+	output = strings.Replace(output, "%level", entry.Level.String(), 1)
+	output = strings.Replace(output, "%field", buildFields(entry), 1)
+	output = strings.Replace(output, "%msg", entry.Message, 1)
+	return []byte(output), nil
+}
+
+func buildFields(entry *logrus.Entry) string {
+	var fields []string
+	for key, val := range entry.Data {
+		stringVal, ok := val.(string)
+		if !ok {
+			stringVal = fmt.Sprint(val)
+		}
+		fields = append(fields, key+"="+stringVal)
+	}
+	return strings.Join(fields, ",")
+}
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/log/log_test.go
similarity index 66%
rename from internal/pkg/logger/log_test.go
rename to internal/pkg/log/log_test.go
index d8ad895..2c690d8 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/log/log_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package log
 
 import (
 	"reflect"
@@ -26,7 +26,13 @@ import (
 )
 
 func TestFormatter_Format(t *testing.T) {
-	Init(SetLogPattern("[%time][%level][%field] - %msg"), SetTimePattern("2006-01-02 15:04:05,001"))
+	initBySettings([]ConfigOption{
+		SetLevel("warn"),
+	},
+		[]FormatOption{
+			SetLogPattern("[%time][%level][%field] - %msg"),
+			SetTimePattern("2006-01-02 15:04:05,001"),
+		})
 	type args struct {
 		entry *logrus.Entry
 	}
@@ -41,7 +47,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.TraceLevel
 					entry.Message = "entry1"
 					return entry
@@ -53,7 +59,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.WarnLevel
 					entry.Message = "entry2"
 					return entry
@@ -64,7 +70,7 @@ func TestFormatter_Format(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			f := Log.Formatter
+			f := Logger.Formatter
 			got, _ := f.Format(tt.args.entry)
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("Format() got = %s, want %s", got, tt.want)
@@ -72,3 +78,31 @@ func TestFormatter_Format(t *testing.T) {
 		})
 	}
 }
+
+func TestSetLevel(t *testing.T) {
+	type args struct {
+		opts ConfigOption
+	}
+	tests := []struct {
+		name string
+		args args
+		want logrus.Level
+	}{
+		{
+			name: "info",
+			args: args{
+				opts: SetLevel("warn"),
+			},
+			want: logrus.WarnLevel,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			logger := logrus.New()
+			tt.args.opts(logger)
+			if logger.Level != tt.want {
+				t.Errorf("SetLevel() got = %s, want %s", logger.Level, tt.want)
+			}
+		})
+	}
+}
diff --git a/internal/pkg/logger/log.go b/internal/pkg/logger/log.go
deleted file mode 100644
index a62128d..0000000
--- a/internal/pkg/logger/log.go
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package logger
-
-import (
-	"fmt"
-	"os"
-	"strings"
-	"sync"
-
-	"github.com/sirupsen/logrus"
-)
-
-// Default logger config.
-const (
-	defaultLogPattern  = "%time [%level][%field] - %msg"
-	defaultTimePattern = "2006-01-02 15:04:05.001"
-)
-
-type formatter struct {
-	logPattern  string
-	timePattern string
-}
-
-// Option is a function to set formatter config.
-type Option func(f *formatter)
-
-// Log is the global logger.
-var Log *logrus.Logger
-var once sync.Once
-
-// The Log init method, keep Log as a singleton.
-func Init(opts ...Option) {
-	once.Do(func() {
-		if Log == nil {
-			Log = logrus.New()
-		}
-		Log.SetOutput(os.Stdout)
-		Log.SetLevel(logrus.InfoLevel)
-		f := &formatter{}
-		for _, opt := range opts {
-			opt(f)
-		}
-		if f.logPattern == "" {
-			f.logPattern = defaultLogPattern
-		}
-		if f.timePattern == "" {
-			f.timePattern = defaultTimePattern
-		}
-		if !strings.Contains(f.logPattern, "\n") {
-			f.logPattern += "\n"
-		}
-		Log.SetFormatter(f)
-	})
-}
-
-// Set the log pattern in formatter.
-func SetLogPattern(logPattern string) Option {
-	return func(f *formatter) {
-		f.logPattern = logPattern
-	}
-}
-
-// Set the time pattern in formatter.
-func SetTimePattern(timePattern string) Option {
-	return func(f *formatter) {
-		f.timePattern = timePattern
-	}
-}
-
-// Format supports unified log output format that has %time, %level, %field and %msg.
-func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
-	output := f.logPattern
-	output = strings.Replace(output, "%time", entry.Time.Format(f.timePattern), 1)
-	output = strings.Replace(output, "%level", entry.Level.String(), 1)
-	output = strings.Replace(output, "%field", buildFields(entry), 1)
-	output = strings.Replace(output, "%msg", entry.Message, 1)
-	return []byte(output), nil
-}
-
-func buildFields(entry *logrus.Entry) string {
-	var fields []string
-	for key, val := range entry.Data {
-		stringVal, ok := val.(string)
-		if !ok {
-			stringVal = fmt.Sprint(val)
-		}
-		fields = append(fields, key+"="+stringVal)
-	}
-	return strings.Join(fields, ",")
-}
diff --git a/internal/pkg/plugin/define.go b/internal/pkg/plugin/define.go
deleted file mode 100644
index ad95698..0000000
--- a/internal/pkg/plugin/define.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package plugin
-
-// The following graph illustrates the relationship between different plugin interface in api package.
-//
-//
-//                   Gatherer                                Processor
-//       -------------------------------      -------------------------------------------
-//      |  -----------       ---------   |   |  -----------                 -----------  |
-//      | | Collector | ==> |  Queue   | |==>| |  Filter   | ==>  ...  ==> |  Filter   | |
-//      | | (Parser)  |     | Mem/File | |   |  -----------                 -----------  |
-//      |  -----------       ----------  |   |      ||                          ||       |
-//       --------------------------------    |      \/	                        \/       |
-//                                           |  ---------------------------------------  |
-//                                           | |             OutputEventContext        | |
-//                                           |  ---------------------------------------  |
-//                                            -------------------------------------------      -----------------
-//                                             ||                                       ----->| Sharing Client  |
-//                                             \/              Sender                  |       -----------------
-//                                             ----------------------------------------|-
-//                                            |  ---       ---                         | |
-//                                            | | B |     | D |     -----------------  | |
-//                                            | | A |     | I |    |Segment Forwarder|-| |
-//                                            | | T |     | S |    |    (Fallbacker) | | |
-//                                            | | C |     | P |     -----------------  | |
-//                                            | | H |  => | A |                        | | ===> Kakfa/OAP
-//                                            | | B |     | T | =>        ......       | |
-//                                            | | U |     | C |                        | |
-//                                            | | F |     | H |     -----------------  | |
-//                                            | | F |     | E |    | Meter  Forwarder|-| |
-//                                            | | E |     | R |    |     (Fallbacker | | |
-//                                            | | R |     |   |     -----------------  | |
-//                                            |  ---       ---                         | |
-//                                             ----------------------------------------
-//
-//
-// 1. The Collector plugin would fetch or receive the input data.
-// 2. The Parser plugin would parse the input data to SerializableEvent that is supported
-//    to be stored in Queue.
-// 3. The Queue plugin stores the SerializableEvent. However, whether serializing depends on
-//    the Queue implements. For example, the serialization is unnecessary when using a Memory
-//    Queue. Once an event is pulled by the consumer of Queue, the event will be processed by
-//    the filters in Processor.
-// 4. The Filter plugin would process the event to create a new event. Next, the event is passed
-//    to the next filter to do the same things until the whole filters are performed. All created
-//    events would be stored in the OutputEventContext. However, only the events labeled with
-//    RemoteEvent type would be forwarded by Forwarder.
-// 5. After processing, the events in OutputEventContext would be stored in the BatchBuffer. When
-//    the timer is triggered or the capacity limit is reached, the events in BatchBuffer would be
-//    partitioned by EventType and sent to the different Forwarders, such as Segment Forwarder and
-//    Meter Forwarder.
-// 6. The Follower in different Senders would share with the remote client to avoid make duplicate
-//    connections and have the same Fallbacker(FallBack strategy) to process data. When all
-//    forwarders send success or process success in Fallbacker, the dispatcher would also ack the
-//    batch is a success.
-
-// ============================================================================================
-//
-// There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
-// preparing phase, running phase, and closing phase. In the running phase, each plugin has
-// its own interface definition. However, the other three phases have to be defined uniformly.
-
-type Plugin interface {
-	// Description returns the description of the specific plugin.
-	Description() string
-	// Init initialize the specific plugin.
-	InitPlugin(config map[string]interface{})
-}
diff --git a/plugins/client/api/client.go b/internal/pkg/plugin/definition.go
similarity index 51%
copy from plugins/client/api/client.go
copy to internal/pkg/plugin/definition.go
index 51d2b65..971fd89 100644
--- a/plugins/client/api/client.go
+++ b/internal/pkg/plugin/definition.go
@@ -15,33 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
-
-import (
-	"reflect"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-)
-
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
+package plugin
+
+// Plugin defines the plugin model in Satellite.
+type Plugin interface {
+	// Name returns the name of the specific plugin.
+	Name() string
+	// Description returns the description of the specific plugin.
+	Description() string
+	// Config returns the default config, that is a YAML pattern.
+	DefaultConfig() string
 }
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+// SharingPlugin the plugins cloud be sharing with different modules in different namespaces.
+type SharingPlugin interface {
+	Plugin
 
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+	// Prepare the sharing plugins, such as build the connection with the external services.
+	Prepare() error
+	// Close the sharing plugin.
+	Close() error
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
-}
+// Config is used to initialize the DefaultInitializingPlugin.
+type Config map[string]interface{}
+
+// NameField is a required field in Config.
+const NameField = "plugin_name"
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
new file mode 100644
index 0000000..efebdba
--- /dev/null
+++ b/internal/pkg/plugin/plugin_test.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"reflect"
+	"testing"
+)
+
+type DemoCategory interface {
+	Plugin
+	Say() string
+}
+
+type DemoPlugin struct {
+	Organization string `mapstructure:"organization"`
+	Project      string `mapstructure:"project"`
+}
+
+func (d *DemoPlugin) Say() string {
+	return d.Organization + ":" + d.Project
+}
+
+func (d *DemoPlugin) Name() string {
+	return "demoplugin"
+}
+
+func (d *DemoPlugin) Description() string {
+	return "this is just a demo"
+}
+
+func (d *DemoPlugin) DefaultConfig() string {
+	return `
+organization: "ASF"
+project: "skywalking-satellite"
+`
+}
+
+func TestPlugin(t *testing.T) {
+	tests := []struct {
+		name string
+		args Config
+		want *DemoPlugin
+	}{
+		{
+			name: "test1",
+			args: Config{
+				"plugin_name":  "demoplugin",
+				"organization": "CNCF",
+				"project":      "Fluentd",
+			},
+			want: &DemoPlugin{
+				Organization: "CNCF",
+				Project:      "Fluentd",
+			},
+		},
+		{
+			name: "demoplugin",
+			args: Config{
+				"plugin_name": "demoplugin",
+			},
+			want: &DemoPlugin{
+				Organization: "ASF",
+				Project:      "skywalking-satellite",
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			defer func() {
+				if i := recover(); i != nil {
+					t.Errorf("the plugin %s is not exist", "demoplugin")
+				}
+			}()
+			plugin := Get(reflect.TypeOf((*DemoCategory)(nil)).Elem(), tt.args)
+			if !reflect.DeepEqual(plugin, tt.want) {
+				t.Errorf("Format() got = %v, want %v", plugin, tt.want)
+			}
+		})
+	}
+}
+
+func init() {
+	RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem())
+	RegisterPlugin(&DemoPlugin{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 4731d36..36b0d42 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -20,60 +20,84 @@ package plugin
 import (
 	"fmt"
 	"reflect"
-	"sync"
-)
+	"strings"
 
-// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
-// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
-// used as key in pluginRegistry.
+	"github.com/spf13/viper"
+)
 
-// reg is the global plugin registry
+// the global plugin registry
 var (
-	reg  map[reflect.Type]map[string]reflect.Value
-	lock sync.Mutex
+	reg map[reflect.Type]map[string]reflect.Value
 )
 
 func init() {
 	reg = make(map[reflect.Type]map[string]reflect.Value)
 }
 
-// Add new plugin category. The different plugin category could have same plugin names.
-func AddPluginCategory(pluginCategory reflect.Type) {
-	lock.Lock()
-	defer lock.Unlock()
-	reg[pluginCategory] = map[string]reflect.Value{}
+// RegisterPluginCategory register the RegInfo to the global type registry.
+func RegisterPluginCategory(pluginType reflect.Type) {
+	reg[pluginType] = map[string]reflect.Value{}
 }
 
 // RegisterPlugin registers the pluginType as plugin.
 // If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
-func RegisterPlugin(pluginName string, plugin interface{}) {
-	lock.Lock()
-	defer lock.Unlock()
+func RegisterPlugin(plugin Plugin) {
 	v := reflect.ValueOf(plugin)
 	success := false
 	for pCategory, pReg := range reg {
 		if v.Type().Implements(pCategory) {
-			pReg[pluginName] = v
-			fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+			pReg[plugin.Name()] = v
+			fmt.Printf("register %s %s successfully ", plugin.Name(), v.Type().String())
 			success = true
 		}
 	}
 	if !success {
-		fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+		fmt.Printf("this type of %s is not supported to register : %s", plugin.Name(), v.Type().String())
 	}
 }
 
-// Get the specific plugin according to the pluginCategory and pluginName.
-func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
-	value, ok := reg[pluginCategory][pluginName]
+// Get an initialized specific plugin according to the pluginCategory and config.
+func Get(category reflect.Type, cfg Config) Plugin {
+	pluginName := nameFinder(cfg)
+	value, ok := reg[category][pluginName]
 	if !ok {
-		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
 	}
 	t := value.Type()
 	if t.Kind() == reflect.Ptr {
 		t = t.Elem()
 	}
 	plugin := reflect.New(t).Interface().(Plugin)
-	plugin.InitPlugin(config)
+	initializing(plugin, cfg)
 	return plugin
 }
+
+// nameFinder is used to get the plugin name in Config.
+func nameFinder(cfg interface{}) string {
+	c, ok := cfg.(Config)
+	if !ok {
+		panic(fmt.Errorf("nameFinder only supports Config"))
+	}
+	name, ok := c[NameField]
+	if !ok {
+		panic(fmt.Errorf("%s is requeired in Config", NameField))
+	}
+	return name.(string)
+}
+
+// initializing initialize the fields by fields mapping.
+func initializing(plugin Plugin, cfg Config) {
+	v := viper.New()
+	v.SetConfigType("yaml")
+	if plugin.DefaultConfig() != "" {
+		if err := v.ReadConfig(strings.NewReader(plugin.DefaultConfig())); err != nil {
+			panic(fmt.Errorf("cannot read default config in the plugin: %s, the error is %v", plugin.Name(), err))
+		}
+	}
+	if err := v.MergeConfigMap(cfg); err != nil {
+		panic(fmt.Errorf("%s plugin cannot merge the custom configuration, the error is %v", plugin.Name(), err))
+	}
+	if err := v.Unmarshal(plugin); err != nil {
+		panic(fmt.Errorf("cannot inject  the config to the %s plugin, the error is %v", plugin.Name(), err))
+	}
+}
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
new file mode 100644
index 0000000..bd29232
--- /dev/null
+++ b/internal/satellite/boot/boot.go
@@ -0,0 +1,149 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package boot
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"os"
+	"reflect"
+	"sync"
+	"syscall"
+
+	"os/signal"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/processor"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/sender"
+	"github.com/apache/skywalking-satellite/internal/satellite/sharing"
+	"github.com/apache/skywalking-satellite/plugins"
+)
+
+// ModuleContainer contains the every running module in each namespace.
+type ModuleContainer map[string][]api.Module
+
+// Start Satellite.
+func Start(cfg *config.SatelliteConfig) error {
+	log.Init(cfg.Logger)
+	// register the supported plugin types to the registry
+	plugins.RegisterPlugins()
+	// use context to receive the external signal.
+	ctx, cancel := context.WithCancel(context.Background())
+	addShutdownListener(cancel)
+	// initialize the sharing plugins
+	sharing.Load(cfg.Sharing)
+	if err := sharing.Prepare(); err != nil {
+		return fmt.Errorf("error in preparing the sharing plugins: %v", err)
+	}
+	defer sharing.Close()
+	// boot Satellite
+	if modules, err := initModules(cfg); err != nil {
+		return err
+	} else if err := prepareModules(modules); err != nil {
+		return err
+	} else {
+		bootModules(ctx, modules)
+		return nil
+	}
+}
+
+// addShutdownListener add a close signal listener.
+func addShutdownListener(cancel context.CancelFunc) {
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
+	go func() {
+		<-signals
+		cancel()
+	}()
+}
+
+// initModules init the modules and register the modules to the module container.
+func initModules(cfg *config.SatelliteConfig) (ModuleContainer, error) {
+	log.Logger.Infof("satellite is initializing...")
+	if err := initModuleConfig(cfg); err != nil {
+		return nil, err
+	}
+	// container contains the modules in each namespace.
+	container := make(ModuleContainer)
+	for _, aCfg := range cfg.Namespaces {
+		// the added sequence should follow gather, sender and processor to purpose the booting sequence.
+		var modules []api.Module
+		g := gatherer.NewGatherer(aCfg.Gatherer)
+		s := sender.NewSender(aCfg.Sender, g)
+		p := processor.NewProcessor(aCfg.Processor, s, g)
+		modules = append(modules, g, s, p)
+		container[aCfg.ModuleCommonConfig.NamespaceName] = modules
+	}
+	return container, nil
+}
+
+// initModuleConfig valid the config pattern and inject the common config to the specific module config.
+func initModuleConfig(cfg *config.SatelliteConfig) error {
+	for _, aCfg := range cfg.Namespaces {
+		if aCfg.Gatherer == nil || aCfg.Sender == nil || aCfg.Processor == nil {
+			return errors.New("gatherer, sender, and processor is required in the namespace config")
+		}
+	}
+	// inject module common config to the specific module config
+	for _, aCfg := range cfg.Namespaces {
+		aCfg.Sender.ModuleCommonConfig = *aCfg.ModuleCommonConfig
+		aCfg.Gatherer.ModuleCommonConfig = *aCfg.ModuleCommonConfig
+		aCfg.Gatherer.ModuleCommonConfig = *aCfg.ModuleCommonConfig
+	}
+	return nil
+}
+
+// prepareModules makes that all modules are in a bootable state.
+func prepareModules(container ModuleContainer) error {
+	log.Logger.Infof("satellite is prepare to start...")
+	var preparedModules []api.Module
+	for ns, modules := range container {
+		for _, m := range modules {
+			preparedModules = append(preparedModules, m)
+			if err := m.Prepare(); err != nil {
+				for _, preparedModule := range preparedModules {
+					preparedModule.Shutdown()
+				}
+				log.Logger.Errorf("%s module of %s namespace is error in preparing stage, error is %v", reflect.TypeOf(m).String(), ns, err)
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+// bootModules boot all modules.
+func bootModules(ctx context.Context, container ModuleContainer) {
+	log.Logger.Infof("satellite is starting...")
+	var wg sync.WaitGroup
+	for _, modules := range container {
+		for _, m := range modules {
+			m := m
+			go func() {
+				defer wg.Done()
+				wg.Add(1)
+				m.Boot(ctx)
+			}()
+		}
+	}
+	wg.Wait()
+}
diff --git a/internal/satellite/config/loader.go b/internal/satellite/config/loader.go
new file mode 100644
index 0000000..1518084
--- /dev/null
+++ b/internal/satellite/config/loader.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"sync"
+
+	"io/ioutil"
+	"path/filepath"
+
+	"github.com/spf13/viper"
+)
+
+var (
+	cfgLock sync.Mutex
+)
+
+// Load SatelliteConfig. The func could not use global logger of Satellite, because it is executed before logger initialization.
+func Load(configPath string) *SatelliteConfig {
+	cfgLock.Lock()
+	defer cfgLock.Unlock()
+	fmt.Printf("load config from : %s\n", configPath)
+	cfg, err := load(configPath)
+	if err != nil {
+		panic(fmt.Errorf("could not load config form the path: %s, the error is :%v", configPath, err))
+	} else {
+		return cfg
+	}
+}
+
+// load SatelliteConfig from the yaml config.
+func load(configPath string) (*SatelliteConfig, error) {
+	absolutePath, err := filepath.Abs(configPath)
+	if err != nil {
+		return nil, err
+	}
+	content, err := ioutil.ReadFile(absolutePath)
+	if err != nil {
+		return nil, err
+	}
+	v := viper.New()
+	v.SetConfigType("yaml")
+	cfg := SatelliteConfig{}
+	if err := v.ReadConfig(bytes.NewReader(content)); err != nil {
+		return nil, err
+	}
+	if err := v.Unmarshal(&cfg); err != nil {
+		return nil, err
+	}
+
+	return &cfg, nil
+}
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
new file mode 100644
index 0000000..5d978c7
--- /dev/null
+++ b/internal/satellite/config/loader_test.go
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"encoding/json"
+	"reflect"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	processor "github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
+	sender "github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+)
+
+func TestLoad(t *testing.T) {
+	type args struct {
+		configPath string
+	}
+	tests := []struct {
+		name string
+		args args
+		want *SatelliteConfig
+	}{
+		{
+			name: "Legal configuration",
+			args: args{configPath: "../../../configs/satellite_config.yaml"},
+			want: &SatelliteConfig{
+				Logger: &log.LoggerConfig{
+					LogPattern:  "%time [%level][%field] - %msg",
+					TimePattern: "2006-01-02 15:04:05.001",
+					Level:       "info",
+				},
+				Sharing: &SharingConfig{
+					Clients: []plugin.Config{
+						{
+							"plugin_name": "grpc-client",
+							"k":           "v",
+						},
+					},
+					Servers: []plugin.Config{
+						{
+							"plugin_name": "grpc-server",
+							"k":           "v",
+						},
+					},
+				},
+				Namespaces: []*NamespaceConfig{
+					{
+						ModuleCommonConfig: &api.ModuleCommonConfig{
+							NamespaceName: "namespace1",
+						},
+
+						Gatherer: &gatherer.GathererConfig{
+							ReceiverConfig: plugin.Config{
+								"plugin_name": "segment-receiver",
+								"server_name": "grpc-server",
+								"k":           "v",
+							},
+							QueueConfig: plugin.Config{
+								"plugin_name": "mmap-queue",
+								"key":         "value",
+							},
+						},
+						Processor: &processor.ProcessorConfig{
+							FilterConfig: []plugin.Config{
+								{
+									"plugin_name": "filtertype1",
+									"key":         "value",
+								},
+							},
+						},
+						Sender: &sender.SenderConfig{
+							MaxBufferSize:  100,
+							MinFlushEvents: 30,
+							FlushTime:      200,
+							ClientName:     "grpc-client",
+							ForwardersConfig: []plugin.Config{
+								{
+									"plugin_name": "segment-forwarder",
+									"key":         "value",
+								},
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c, err := load(tt.args.configPath)
+			if err != nil {
+				t.Fatalf("cannot load config: %v", err)
+			}
+			doJudgeEqual(t, c.Logger, tt.want.Logger)
+			doJudgeEqual(t, c.Sharing.Servers, tt.want.Sharing.Servers)
+			doJudgeEqual(t, c.Sharing.Clients, tt.want.Sharing.Clients)
+			doJudgeEqual(t, c.Namespaces[0].ModuleCommonConfig, tt.want.Namespaces[0].ModuleCommonConfig)
+			doJudgeEqual(t, c.Namespaces[0].Gatherer, tt.want.Namespaces[0].Gatherer)
+			doJudgeEqual(t, c.Namespaces[0].Processor, tt.want.Namespaces[0].Processor)
+			doJudgeEqual(t, c.Namespaces[0].Sender, tt.want.Namespaces[0].Sender)
+		})
+	}
+}
+
+func doJudgeEqual(t *testing.T, a, b interface{}) {
+	if !reflect.DeepEqual(a, b) {
+		ajson, err := json.Marshal(a)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		bjson, err := json.Marshal(b)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		t.Fatalf("config is not equal, got %s, want %s", ajson, bjson)
+	}
+}
diff --git a/internal/satellite/config/satellite_config.go b/internal/satellite/config/satellite_config.go
new file mode 100644
index 0000000..8bf3d74
--- /dev/null
+++ b/internal/satellite/config/satellite_config.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	processor "github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
+	sender "github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+)
+
+// SatelliteConfig is to initialize Satellite.
+type SatelliteConfig struct {
+	Logger     *log.LoggerConfig  `mapstructure:"logger"`
+	Namespaces []*NamespaceConfig `mapstructure:"namespaces"`
+	Sharing    *SharingConfig     `mapstructure:"sharing"`
+}
+
+// SharingConfig contains some plugins,which could be shared by every namespace. That is useful to reduce resources cost.
+type SharingConfig struct {
+	Clients []plugin.Config `mapstructure:"clients"`
+	Servers []plugin.Config `mapstructure:"servers"`
+}
+
+// NamespaceConfig initializes the different module in different namespace.
+type NamespaceConfig struct {
+	ModuleCommonConfig *api.ModuleCommonConfig    `mapstructure:"common_config"`
+	Gatherer           *gatherer.GathererConfig   `mapstructure:"gatherer"`
+	Processor          *processor.ProcessorConfig `mapstructure:"processor"`
+	Sender             *sender.SenderConfig       `mapstructure:"sender"`
+}
diff --git a/main.go b/internal/satellite/module/api/module.go
similarity index 57%
copy from main.go
copy to internal/satellite/module/api/module.go
index 94fbc40..a36232a 100644
--- a/main.go
+++ b/internal/satellite/module/api/module.go
@@ -15,8 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"context"
+)
+
+// TODO add metrics func
+// Module id a custom plugin interface, which defines the processing.
+type Module interface {
+
+	// Prepare would do some preparing workers, such build connection with external services.
+	Prepare() error
+	// Boot would start the module and return error when started failed. When a stop signal received
+	// or an exception occurs, the shutdown function would be called.
+	Boot(ctx context.Context)
+
+	// Shutdown could do some clean job to close Module.
+	Shutdown()
+}
+
+type ModuleCommonConfig struct {
+	NamespaceName string `mapstructure:"name"`
 }
diff --git a/internal/satellite/module/buffer/buffer.go b/internal/satellite/module/buffer/buffer.go
new file mode 100644
index 0000000..c2633a6
--- /dev/null
+++ b/internal/satellite/module/buffer/buffer.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// BatchBuffer is a buffer to cache the input data in Sender.
+type BatchBuffer struct {
+	buf   []*event.OutputEventContext // cache
+	first event.Offset                // the first OutputEventContext offset
+	last  event.Offset                // the last OutputEventContext offset
+	size  int                         // usage size
+	cap   int                         // the max capacity
+}
+
+// NewBuffer crate a new BatchBuffer according to the capacity param.
+func NewBatchBuffer(capacity int) *BatchBuffer {
+	return &BatchBuffer{
+		buf:   make([]*event.OutputEventContext, capacity),
+		first: "",
+		last:  "",
+		size:  0,
+		cap:   capacity,
+	}
+}
+
+// Buf returns the cached data in BatchBuffer.
+func (b *BatchBuffer) Buf() []*event.OutputEventContext {
+	return b.buf
+}
+
+// First returns the first OutputEventContext offset.
+func (b *BatchBuffer) First() event.Offset {
+	return b.first
+}
+
+// Last returns the last OutputEventContext offset.
+func (b *BatchBuffer) Last() event.Offset {
+	return b.last
+}
+
+// Len returns the usage size.
+func (b *BatchBuffer) Len() int {
+	return b.size
+}
+
+// Add adds a new data input buffer.
+func (b *BatchBuffer) Add(data *event.OutputEventContext) {
+	if b.size == b.cap {
+		log.Logger.Errorf("cannot add one item to the fulling BatchBuffer, the capacity is %d", b.cap)
+		return
+	} else if data.Offset == "" {
+		log.Logger.Errorf("cannot add one item to BatchBuffer because the input data is illegal, the offset is empty")
+		return
+	}
+	if b.size == 0 {
+		b.first = data.Offset
+	} else {
+		b.last = data.Offset
+	}
+	b.buf[b.size] = data
+	b.size++
+}
diff --git a/plugins/fallbacker/api/fallbacker.go b/internal/satellite/module/buffer/buffer_test.go
similarity index 50%
copy from plugins/fallbacker/api/fallbacker.go
copy to internal/satellite/module/buffer/buffer_test.go
index bde56ef..c9f5715 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/internal/satellite/module/buffer/buffer_test.go
@@ -15,30 +15,58 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package buffer
 
 import (
-	"reflect"
+	"testing"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
 )
 
-// Fallbacker is a plugin interface, that defines some fallback strategies.
-type Fallbacker interface {
-	plugin.Plugin
-
-	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents) Fallbacker
-}
-
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
-
-// Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
-	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+func TestNewBuffer(t *testing.T) {
+	buffer := NewBatchBuffer(3)
+	tests := []struct {
+		name string
+		args *event.OutputEventContext
+		want int
+	}{
+		{
+			name: "add-1",
+			args: &event.OutputEventContext{Offset: "1"},
+			want: 1,
+		},
+		{
+			name: "add-2",
+			args: &event.OutputEventContext{Offset: "1"},
+			want: 2,
+		},
+		{
+			name: "add-3",
+			args: &event.OutputEventContext{Offset: "1"},
+			want: 3,
+		},
+		{
+			name: "add-4",
+			args: &event.OutputEventContext{Offset: "1"},
+			want: 3,
+		},
+		{
+			name: "add-5",
+			args: &event.OutputEventContext{Offset: "1"},
+			want: 3,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			buffer.Add(tt.args)
+			if got := buffer.Len(); got != tt.want {
+				t.Errorf("Buffer Len() = %v, want %v", got, tt.want)
+			}
+		})
+	}
 }
 
 func init() {
-	plugin.AddPluginCategory(FallbackerCategory)
+	log.Init(&log.LoggerConfig{})
 }
diff --git a/plugins/client/api/client.go b/internal/satellite/module/gatherer/api/config.go
similarity index 57%
copy from plugins/client/api/client.go
copy to internal/satellite/module/gatherer/api/config.go
index 51d2b65..9978028 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/module/gatherer/api/config.go
@@ -18,30 +18,21 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
+// GathererConfig contains all implementation fields.
+type GathererConfig struct {
+	// common config
+	api.ModuleCommonConfig
+	QueueConfig plugin.Config `mapstructure:"queue"` // queue plugin config
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
-}
+	// ReceiverGatherer
+	ReceiverConfig plugin.Config `mapstructure:"receiver"`    // collector plugin config
+	ServerName     string        `mapstructure:"server_name"` // depends on which server
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+	// FetcherGatherer
+	FetcherConfig plugin.Config `mapstructure:"fetcher"`        // fetcher plugin config
+	FetchInterval int           `mapstructure:"fetch_interval"` // fetch interval, the time unit is millisecond
 }
diff --git a/main.go b/internal/satellite/module/gatherer/api/gatherer.go
similarity index 62%
copy from main.go
copy to internal/satellite/module/gatherer/api/gatherer.go
index 94fbc40..2216209 100644
--- a/main.go
+++ b/internal/satellite/module/gatherer/api/gatherer.go
@@ -15,8 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+// Gatherer is the APM data collection module in Satellite.
+type Gatherer interface {
+	api.Module
+	// DataChannel is a blocking channel to transfer the apm data to the upstream processor module.
+	OutputDataChannel() <-chan *queue.SequenceEvent
+
+	Ack(lastOffset event.Offset)
 }
diff --git a/internal/satellite/module/gatherer/create.go b/internal/satellite/module/gatherer/create.go
new file mode 100644
index 0000000..1eab2ac
--- /dev/null
+++ b/internal/satellite/module/gatherer/create.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gatherer
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/sharing"
+	fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+	receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+	server "github.com/apache/skywalking-satellite/plugins/server/api"
+)
+
+// NewGatherer returns a gatherer module
+func NewGatherer(cfg *api.GathererConfig) api.Gatherer {
+	if cfg.ReceiverConfig != nil {
+		return newReceiverGatherer(cfg)
+	} else if cfg.FetcherConfig != nil {
+		return newFetcherGatherer(cfg)
+	}
+	return nil
+}
+
+// newFetcherGatherer crates a gatherer with the fetcher role.
+func newFetcherGatherer(cfg *api.GathererConfig) *FetcherGatherer {
+	log.Logger.Infof("fetcher gatherer module of %s namespace is being initialized", cfg.NamespaceName)
+	return &FetcherGatherer{
+		config:         cfg,
+		runningQueue:   queue.GetQueue(cfg.QueueConfig),
+		runningFetcher: fetcher.GetFetcher(cfg.FetcherConfig),
+		outputChannel:  make(chan *queue.SequenceEvent),
+	}
+}
+
+// newReceiverGatherer crates a gatherer with the receiver role.
+func newReceiverGatherer(cfg *api.GathererConfig) *ReceiverGatherer {
+	log.Logger.Infof("receiver gatherer module of %s namespace is being initialized", cfg.NamespaceName)
+	return &ReceiverGatherer{
+		config:          cfg,
+		runningQueue:    queue.GetQueue(cfg.QueueConfig),
+		runningReceiver: receiver.GetReceiver(cfg.ReceiverConfig),
+		runningServer:   sharing.Manager[cfg.ServerName].(server.Server),
+		outputChannel:   make(chan *queue.SequenceEvent),
+	}
+}
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
new file mode 100644
index 0000000..77ac6d0
--- /dev/null
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -0,0 +1,89 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gatherer
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type FetcherGatherer struct {
+	// config
+	config *api.GathererConfig
+
+	// dependency plugins
+	runningFetcher fetcher.Fetcher
+	runningQueue   queue.Queue
+
+	// self components
+	outputChannel chan *queue.SequenceEvent
+}
+
+func (f *FetcherGatherer) Prepare() error {
+	return nil
+}
+
+func (f *FetcherGatherer) Boot(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(f.config.FetchInterval) * time.Millisecond)
+		for {
+			select {
+			case <-timeTicker.C:
+				events := f.runningFetcher.Fetch()
+				for _, event := range events {
+					err := f.runningQueue.Push(event)
+					if err != nil {
+						// todo add abandonedCount metrics
+						log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.NamespaceName, err)
+					}
+				}
+			case e := <-f.runningQueue.Pop():
+				f.outputChannel <- e
+			case <-ctx.Done():
+				f.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+func (f *FetcherGatherer) Shutdown() {
+	log.Logger.Infof("fetcher gatherer module of %s namespace is closing", f.config.NamespaceName)
+	if err := f.runningQueue.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace :%v", f.runningQueue.Name(), f.config.NamespaceName, err)
+	}
+}
+
+func (f *FetcherGatherer) OutputDataChannel() <-chan *queue.SequenceEvent {
+	return f.outputChannel
+}
+
+func (f *FetcherGatherer) Ack(lastOffset event.Offset) {
+	f.runningQueue.Ack(lastOffset)
+}
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
new file mode 100644
index 0000000..5735474
--- /dev/null
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gatherer
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+	receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+	server "github.com/apache/skywalking-satellite/plugins/server/api"
+)
+
+type ReceiverGatherer struct {
+	// config
+	config *api.GathererConfig
+
+	// dependency plugins
+	runningReceiver receiver.Receiver
+	runningQueue    queue.Queue
+	runningServer   server.Server
+
+	// self components
+	outputChannel chan *queue.SequenceEvent
+}
+
+func (r *ReceiverGatherer) Prepare() error {
+	log.Logger.Infof("receiver gatherer module of %s namespace is preparing", r.config.NamespaceName)
+	r.runningReceiver.RegisterHandler(r.runningServer)
+	if err := r.runningQueue.Prepare(); err != nil {
+		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", r.runningQueue.Name(), r.config.NamespaceName)
+		return err
+	}
+	return nil
+}
+
+func (r *ReceiverGatherer) Boot(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case e := <-r.runningReceiver.Channel():
+				err := r.runningQueue.Push(e)
+				if err != nil {
+					// todo add abandonedCount metrics
+					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.NamespaceName, err)
+				}
+			case e := <-r.runningQueue.Pop():
+				r.outputChannel <- e
+			case <-ctx.Done():
+				r.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+func (r *ReceiverGatherer) Shutdown() {
+	log.Logger.Infof("receiver gatherer module of %s namespace is closing", r.config.NamespaceName)
+	if err := r.runningQueue.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace, error is: %v", r.runningQueue.Name(), r.config.NamespaceName, err)
+	}
+}
+
+func (r *ReceiverGatherer) OutputDataChannel() <-chan *queue.SequenceEvent {
+	return r.outputChannel
+}
+
+func (r *ReceiverGatherer) Ack(lastOffset event.Offset) {
+	r.runningQueue.Ack(lastOffset)
+}
diff --git a/main.go b/internal/satellite/module/processor/api/config.go
similarity index 70%
copy from main.go
copy to internal/satellite/module/processor/api/config.go
index 94fbc40..934ab94 100644
--- a/main.go
+++ b/internal/satellite/module/processor/api/config.go
@@ -15,8 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+)
+
+// ProcessorConfig contains all implementation fields.
+type ProcessorConfig struct {
+	api.ModuleCommonConfig
+
+	FilterConfig []plugin.Config `mapstructure:"filters"` // filter plugins
 }
diff --git a/main.go b/internal/satellite/module/processor/api/processor.go
similarity index 81%
copy from main.go
copy to internal/satellite/module/processor/api/processor.go
index 94fbc40..edeb865 100644
--- a/main.go
+++ b/internal/satellite/module/processor/api/processor.go
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+
+// Processor is the APM data processing module in Satellite.
+type Processor interface {
+	api.Module
 }
diff --git a/internal/satellite/module/processor/create.go b/internal/satellite/module/processor/create.go
new file mode 100644
index 0000000..c5933b2
--- /dev/null
+++ b/internal/satellite/module/processor/create.go
@@ -0,0 +1,41 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package processor
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
+	sender "github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+// Init Processor and dependency plugins
+func NewProcessor(cfg *api.ProcessorConfig, s sender.Sender, g gatherer.Gatherer) api.Processor {
+	log.Logger.Infof("processor module of %s namespace is being initialized", cfg.NamespaceName)
+	p := &Processor{
+		sender:         s,
+		gatherer:       g,
+		config:         cfg,
+		runningFilters: []filter.Filter{},
+	}
+	for _, c := range p.config.FilterConfig {
+		p.runningFilters = append(p.runningFilters, filter.GetFilter(c))
+	}
+	return p
+}
diff --git a/internal/satellite/module/processor/processor.go b/internal/satellite/module/processor/processor.go
new file mode 100644
index 0000000..640f605
--- /dev/null
+++ b/internal/satellite/module/processor/processor.go
@@ -0,0 +1,82 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package processor
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	processor "github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
+	sender "github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+// Processor is the processing module in Satellite.
+type Processor struct {
+	// config
+	config *processor.ProcessorConfig
+
+	// dependency plugins
+	runningFilters []filter.Filter
+
+	// dependency modules
+	sender   sender.Sender
+	gatherer gatherer.Gatherer
+}
+
+func (p *Processor) Prepare() error {
+	return nil
+}
+
+// Boot fetches the data of Queue, does a series of processing, and then sends to Sender.
+func (p *Processor) Boot(ctx context.Context) {
+	log.Logger.Infof("processor module of %s namespace is running", p.config.NamespaceName)
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			// receive the input event from the output channel of the gatherer
+			case e := <-p.gatherer.OutputDataChannel():
+				c := &event.OutputEventContext{
+					Offset:  e.Offset,
+					Context: make(map[string]event.Event),
+				}
+				c.Put(e.Event)
+				// processing the event with filters, that put the necessary events to OutputEventContext.
+				for _, f := range p.runningFilters {
+					f.Process(c)
+				}
+				// send the final context that contains many events to the sender.
+				p.sender.InputDataChannel() <- c
+			case <-ctx.Done():
+				p.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+func (p *Processor) Shutdown() {
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/module/sender/api/config.go
similarity index 57%
copy from plugins/client/api/client.go
copy to internal/satellite/module/sender/api/config.go
index 51d2b65..1fff0d7 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/module/sender/api/config.go
@@ -18,30 +18,18 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
-}
+type SenderConfig struct {
+	api.ModuleCommonConfig
+	// plugins config
+	ForwardersConfig []plugin.Config `mapstructure:"forwarders"`  // forwarder plugins config
+	FallbackerConfig plugin.Config   `mapstructure:"fallbacker"`  // fallbacker plugins config
+	ClientName       string          `mapstructure:"client_name"` // client plugin name
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+	MaxBufferSize  int `mapstructure:"max_buffer_size"`  // the max buffer capacity
+	MinFlushEvents int `mapstructure:"min_flush_events"` // the min flush events when receives a timer flush signal
+	FlushTime      int `mapstructure:"flush_time"`       // the period flush time
 }
diff --git a/main.go b/internal/satellite/module/sender/api/sender.go
similarity index 70%
copy from main.go
copy to internal/satellite/module/sender/api/sender.go
index 94fbc40..e1e3d0e 100644
--- a/main.go
+++ b/internal/satellite/module/sender/api/sender.go
@@ -15,8 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+)
+
+type Sender interface {
+	api.Module
+
+	// InputDataChannel is a blocking channel to receive the apm data from the downstream processor module.
+	InputDataChannel() chan<- *event.OutputEventContext
 }
diff --git a/internal/satellite/module/sender/create.go b/internal/satellite/module/sender/create.go
new file mode 100644
index 0000000..43ae014
--- /dev/null
+++ b/internal/satellite/module/sender/create.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/sharing"
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+	fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// NewSender crate a Sender.
+func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) api.Sender {
+	log.Logger.Infof("sender module of %s namespace is being initialized", cfg.NamespaceName)
+	s := &Sender{
+		config:            cfg,
+		runningForwarders: []forwarder.Forwarder{},
+		runningFallbacker: fallbacker.GetFallbacker(cfg.FallbackerConfig),
+		runningClient:     sharing.Manager[cfg.ClientName].(client.Client),
+		gatherer:          g,
+		logicInput:        nil,
+		physicalInput:     make(chan *event.OutputEventContext),
+		listener:          make(chan client.ClientStatus),
+		flushChannel:      make(chan *buffer.BatchBuffer, 1),
+		buffer:            buffer.NewBatchBuffer(cfg.MaxBufferSize),
+	}
+	for _, c := range s.config.ForwardersConfig {
+		s.runningForwarders = append(s.runningForwarders, forwarder.GetForwarder(c))
+	}
+	return s
+}
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
new file mode 100644
index 0000000..19a469b
--- /dev/null
+++ b/internal/satellite/module/sender/sender.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sender
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+	fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Sender is the forward module in Satellite.
+type Sender struct {
+	// config
+	config *api.SenderConfig
+
+	// dependency plugins
+	runningForwarders []forwarder.Forwarder
+	runningFallbacker fallbacker.Fallbacker
+	runningClient     client.Client
+
+	// dependency modules
+	gatherer gatherer.Gatherer
+
+	// self components
+	logicInput    chan *event.OutputEventContext // logic input channel
+	physicalInput chan *event.OutputEventContext // physical input channel
+	listener      chan client.ClientStatus       // client status listener
+	flushChannel  chan *buffer.BatchBuffer       // forwarder flush channel
+	buffer        *buffer.BatchBuffer            // cache the downstream input data
+}
+
+// Prepare register the client status listener to the client manager and open input channel.
+func (s *Sender) Prepare() error {
+	log.Logger.Infof("sender module of %s namespace is preparing", s.config.NamespaceName)
+	s.runningClient.RegisterListener(s.listener)
+	s.logicInput = s.physicalInput
+	return nil
+}
+
+// Boot fetches the downstream input data and forward to external services, such as Kafka and OAP receiver.
+func (s *Sender) Boot(ctx context.Context) {
+	log.Logger.Infof("sender module of %s namespace is running", s.config.NamespaceName)
+	var wg sync.WaitGroup
+	wg.Add(2)
+	// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+	// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+		for {
+			select {
+			case status := <-s.listener:
+				switch status {
+				case client.Connected:
+					log.Logger.Infof("sender module of %s namespace is notified the connection connected", s.config.NamespaceName)
+					s.logicInput = s.physicalInput
+				case client.Disconnect:
+					log.Logger.Infof("sender module of %s namespace is notified the connection disconnected", s.config.NamespaceName)
+					s.logicInput = nil
+				}
+			case <-timeTicker.C:
+				if s.buffer.Len() > s.config.MinFlushEvents {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case e := <-s.logicInput:
+				s.buffer.Add(e)
+				if s.buffer.Len() == s.config.MaxBufferSize {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case <-ctx.Done():
+				s.logicInput = nil
+				return
+			}
+		}
+	}()
+	// Keep fetching BatchBuffer to forward.
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case b := <-s.flushChannel:
+				s.consume(b)
+			case <-ctx.Done():
+				s.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown closes the channels and tries to force forward the events in the buffer.
+func (s *Sender) Shutdown() {
+	log.Logger.Infof("sender module of %s namespace is closing", s.config.NamespaceName)
+	close(s.logicInput)
+	for buf := range s.flushChannel {
+		s.consume(buf)
+	}
+	s.consume(s.buffer)
+	close(s.flushChannel)
+}
+
+// consume would forward the events by type and ack this batch.
+func (s *Sender) consume(batch *buffer.BatchBuffer) {
+	log.Logger.Infof("sender module of %s namespace is flushing a new batch buffer."+
+		" the start offset is %s, and the size is %d", s.config.NamespaceName, batch.Last(), batch.Len())
+	var events = make(map[event.Type]event.BatchEvents)
+	for i := 0; i < batch.Len(); i++ {
+		eventContext := batch.Buf()[i]
+		for _, e := range eventContext.Context {
+			if e.IsRemote() {
+				events[e.Type()] = append(events[e.Type()], e)
+			}
+		}
+	}
+
+	for _, f := range s.runningForwarders {
+		for t, batchEvents := range events {
+			if f.ForwardType() != t {
+				continue
+			}
+			if err := f.Forward(s.runningClient.GetConnectedClient(), batchEvents); err == nil {
+				continue
+			}
+			if !s.runningFallbacker.FallBack(batchEvents, s.runningClient.GetConnectedClient(), f.Forward) {
+				if s.runningClient.IsConnected() {
+					s.runningClient.ReportErr()
+				}
+			}
+		}
+	}
+	s.gatherer.Ack(batch.Last())
+}
+
+func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
+	return s.logicInput
+}
diff --git a/internal/satellite/sharing/sharing_plugins.go b/internal/satellite/sharing/sharing_plugins.go
new file mode 100644
index 0000000..ee60398
--- /dev/null
+++ b/internal/satellite/sharing/sharing_plugins.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sharing
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+	server "github.com/apache/skywalking-satellite/plugins/server/api"
+)
+
+// Manager contains the sharing plugins, only supports client and server plugins.
+var Manager map[string]plugin.SharingPlugin
+var once sync.Once
+
+// Load loads the sharing config to the Manager.
+func Load(cfg *config.SharingConfig) {
+	once.Do(func() {
+		for _, c := range cfg.Clients {
+			p := client.GetClient(c)
+			Manager[p.Name()] = p
+		}
+		for _, c := range cfg.Servers {
+			p := server.GetServer(c)
+			Manager[p.Name()] = p
+		}
+	},
+	)
+}
+
+func Prepare() error {
+	for _, sharingPlugin := range Manager {
+		if err := sharingPlugin.Prepare(); err != nil {
+			log.Logger.Errorf("error in closing the %s sharing plugin: %v", sharingPlugin.Name(), err)
+			Close()
+			return fmt.Errorf("cannot preare the sharing plugins named %s: %v", sharingPlugin.Name(), err)
+		}
+	}
+	return nil
+}
+
+func Close() {
+	for _, sharingPlugin := range Manager {
+		if err := sharingPlugin.Close(); err != nil {
+			log.Logger.Errorf("error in closing the %s sharing plugin: %v", sharingPlugin.Name(), err)
+		}
+	}
+}
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 51d2b65..87a1ffd 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -18,30 +18,29 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
+// The client statuses.
+const (
+	_ ClientStatus = iota
+	Connected
+	Disconnect
+)
+
+// ClientStatus represents the status of the client.
+type ClientStatus int8
+
 // Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
 type Client interface {
-	plugin.Plugin
+	plugin.SharingPlugin
 
-	// Prepare would make connection with outer service.
-	Prepare()
+	// IsConnected returns the status of the client.
+	IsConnected() bool
 	// GetConnection returns the connected client to publish events.
 	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
-}
-
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+	// RegisterListener register a listener to listen the client status.
+	RegisterListener(chan<- ClientStatus)
+	// Report client error connection error to notify other listeners.
+	ReportErr()
 }
diff --git a/main.go b/plugins/client/api/client_repository.go
similarity index 59%
copy from main.go
copy to plugins/client/api/client_repository.go
index 94fbc40..1368fed 100644
--- a/main.go
+++ b/plugins/client/api/client_repository.go
@@ -15,8 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Get an initialized client plugin.
+func GetClient(config plugin.Config) Client {
+	return plugin.Get(reflect.TypeOf((*Client)(nil)).Elem(), config).(Client)
+}
+
+// RegisterClientPlugins register the used client plugins.
+func RegisterClientPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem())
+	clients := []Client{
+		// Please register the client plugins at here.
+	}
+	for _, client := range clients {
+		plugin.RegisterPlugin(client)
+	}
 }
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
deleted file mode 100644
index 374d63a..0000000
--- a/plugins/collector/api/collector.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package api
-
-import (
-	"reflect"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/event"
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-)
-
-//   Init()     Initial stage: Init plugin by config
-//    ||
-//    \/
-//   Prepare()   Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
-//    ||
-//    \/
-//   Next()     Running stage: When Collector collect a data, the data would be fetched by the upstream
-//    ||                       component through this method.
-//    \/
-//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
-// Collector is a plugin interface, that defines new collectors.
-type Collector interface {
-	plugin.Plugin
-
-	// Prepare creates a listen or reader to gather data.
-	Prepare()
-	// Next return the data from the input.
-	Next() (event.SerializableEvent, error)
-	// Close would close collector.
-	Close()
-}
-
-var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
-
-// Get collector plugin.
-func GetCollector(pluginName string, config map[string]interface{}) Collector {
-	return plugin.Get(CollectorCategory, pluginName, config).(Collector)
-}
-
-func init() {
-	plugin.AddPluginCategory(CollectorCategory)
-}
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index bde56ef..41a9bd7 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -18,27 +18,16 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
 )
 
 // Fallbacker is a plugin interface, that defines some fallback strategies.
 type Fallbacker interface {
 	plugin.Plugin
-
 	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents) Fallbacker
-}
-
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
-
-// Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
-	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+	FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc) bool
 }
 
-func init() {
-	plugin.AddPluginCategory(FallbackerCategory)
-}
+type DisconnectionCallback func()
diff --git a/plugins/client/api/client.go b/plugins/fallbacker/api/fallbacker_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/fallbacker/api/fallbacker_repository.go
index 51d2b65..ef8949d 100644
--- a/plugins/client/api/client.go
+++ b/plugins/fallbacker/api/fallbacker_repository.go
@@ -21,27 +21,22 @@ import (
 	"reflect"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/fallbacker/timer"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// Get an initialized client plugin.
+func GetFallbacker(config plugin.Config) Fallbacker {
+	return plugin.Get(reflect.TypeOf((*Fallbacker)(nil)).Elem(), config).(Fallbacker)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterFallbackerPlugins register the used fallbacker plugins.
+func RegisterFallbackerPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem())
+	fallbackers := []Fallbacker{
+		// Please register the fallbacker plugins at here.
+		&timer.Fallbacker{},
+	}
+	for _, fallbacker := range fallbackers {
+		plugin.RegisterPlugin(fallbacker)
+	}
 }
diff --git a/plugins/fallbacker/timer/timer_fallbacker.go b/plugins/fallbacker/timer/timer_fallbacker.go
new file mode 100644
index 0000000..a70ce0e
--- /dev/null
+++ b/plugins/fallbacker/timer/timer_fallbacker.go
@@ -0,0 +1,64 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package timer
+
+import (
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Fallbacker is a timer fallbacker when forward fails. `latencyFactor` is the standard retry duration,
+// and the time for each retry is expanded by 2 times until the number of retries reaches the maximum.
+type Fallbacker struct {
+	maxTimes      int `mapstructure:"max_times"`
+	latencyFactor int `mapstructure:"latency_factor"`
+}
+
+func (t *Fallbacker) Name() string {
+	return "timer-fallbacker"
+}
+
+func (t *Fallbacker) Description() string {
+	return "this is a timer trigger when forward fails."
+}
+
+func (t *Fallbacker) DefaultConfig() string {
+	return `
+max_times: 3
+latency_factor: 2000
+`
+}
+
+func (t *Fallbacker) FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc) bool {
+	if err := forward(connection, batch); err != nil {
+		count := 1
+		currentLatency := count * t.latencyFactor
+		for count < t.maxTimes {
+			time.Sleep(time.Duration(currentLatency) * time.Millisecond)
+			if err := forward(connection, batch); err != nil {
+				currentLatency *= 2
+			} else {
+				return true
+			}
+		}
+		return false
+	}
+	return true
+}
diff --git a/main.go b/plugins/fetcher/api/fetcher.go
similarity index 71%
copy from main.go
copy to plugins/fetcher/api/fetcher.go
index 94fbc40..4021473 100644
--- a/main.go
+++ b/plugins/fetcher/api/fetcher.go
@@ -15,8 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Fetcher is a plugin interface, that defines new fetchers.
+type Fetcher interface {
+	plugin.Plugin
+
+	// Fetch would fetch some APM data.
+	Fetch() []event.SerializableEvent
 }
diff --git a/plugins/client/api/client.go b/plugins/fetcher/api/fetcher_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/fetcher/api/fetcher_repository.go
index 51d2b65..baecdff 100644
--- a/plugins/client/api/client.go
+++ b/plugins/fetcher/api/fetcher_repository.go
@@ -23,25 +23,18 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// Get an initialized fetcher plugin.
+func GetFetcher(config plugin.Config) Fetcher {
+	return plugin.Get(reflect.TypeOf((*Fetcher)(nil)).Elem(), config).(Fetcher)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterFallbackerPlugins register the used fetcher plugins.
+func RegisterFetcherPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Fetcher)(nil)).Elem())
+	fetchers := []Fetcher{
+		// Please register the fetcher plugins at here.
+	}
+	for _, fetcher := range fetchers {
+		plugin.RegisterPlugin(fetcher)
+	}
 }
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index 0ed43db..61ac376 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -18,33 +18,14 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()        Initiating stage: Init plugin by config
-//    ||
-//    \/
-//   Process()     Running stage:    Process the input event to convert to new event. During the processing,
-//                                   the method should also tag event type to mark the event category.
-
 // Filter is a plugin interface, that defines new pipeline filters.
 type Filter interface {
 	plugin.Plugin
 
-	// Process produces a new event by processing incoming event.
-	Process(in event.Event) event.Event
-}
-
-var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
-
-// Get filter plugin.
-func GetFilter(pluginName string, config map[string]interface{}) Filter {
-	return plugin.Get(FilterCategory, pluginName, config).(Filter)
-}
-
-func init() {
-	plugin.AddPluginCategory(FilterCategory)
+	// Process would put the needed event to the OutputEventContext.
+	Process(context *event.OutputEventContext)
 }
diff --git a/plugins/client/api/client.go b/plugins/filter/api/filter_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/filter/api/filter_repository.go
index 51d2b65..5b69788 100644
--- a/plugins/client/api/client.go
+++ b/plugins/filter/api/filter_repository.go
@@ -23,25 +23,18 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// GetFilter an initialized filter plugin.
+func GetFilter(config plugin.Config) Filter {
+	return plugin.Get(reflect.TypeOf((*Filter)(nil)).Elem(), config).(Filter)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterFilterPlugins register the used filter plugins.
+func RegisterFilterPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Filter)(nil)).Elem())
+	filters := []Filter{
+		// Please register the filter plugins at here.
+	}
+	for _, filter := range filters {
+		plugin.RegisterPlugin(filter)
+	}
 }
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index 9e99f14..df22ca4 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -18,40 +18,18 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()     Initiating stage: Init plugin by config
-//    ||
-//    \/
-//   Prepare()   Preparing stage: Prepare the Forwarder, such as get remote client.
-//    ||
-//    \/
-//   Forward()  Running stage: Forward the batch events
-//    ||
-//    \/
-//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
 // Forwarder is a plugin interface, that defines new forwarders.
 type Forwarder interface {
 	plugin.Plugin
-
 	// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
-	Forward(batch event.BatchEvents)
-
-	// ForwardType returns the supporting event type that could be forwarded.
+	Forward(connection interface{}, batch event.BatchEvents) error
+	// ForwardType returns the supported event type.
 	ForwardType() event.Type
 }
 
-var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
-
-func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
-	return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
-}
-
-func init() {
-	plugin.AddPluginCategory(ForwarderCategory)
-}
+// ForwardFunc represent the Forward() in Forwarder
+type ForwardFunc func(connection interface{}, batch event.BatchEvents) error
diff --git a/plugins/client/api/client.go b/plugins/forwarder/api/forwarder_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/forwarder/api/forwarder_repository.go
index 51d2b65..89db43e 100644
--- a/plugins/client/api/client.go
+++ b/plugins/forwarder/api/forwarder_repository.go
@@ -23,25 +23,18 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// GetForwarder an initialized filter plugin.
+func GetForwarder(config plugin.Config) Forwarder {
+	return plugin.Get(reflect.TypeOf((*Forwarder)(nil)).Elem(), config).(Forwarder)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterForwarderPlugins register the used filter plugins.
+func RegisterForwarderPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem())
+	forwarders := []Forwarder{
+		// Please register the forwarder plugins at here.
+	}
+	for _, forwarder := range forwarders {
+		plugin.RegisterPlugin(forwarder)
+	}
 }
diff --git a/plugins/init.go b/plugins/init.go
new file mode 100644
index 0000000..2bf3bf4
--- /dev/null
+++ b/plugins/init.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugins
+
+import (
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+	fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+	fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
+	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+	parser "github.com/apache/skywalking-satellite/plugins/parser/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+	receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+	server "github.com/apache/skywalking-satellite/plugins/server/api"
+)
+
+// RegisterPlugins register the whole supported plugin category and plugin types to the registry.
+func RegisterPlugins() {
+	// plugins
+	filter.RegisterFilterPlugins()
+	forwarder.RegisterForwarderPlugins()
+	parser.RegisterParserPlugins()
+	queue.RegisterQueuePlugins()
+	receiver.RegisterReceiverPlugins()
+	fetcher.RegisterFetcherPlugins()
+	fallbacker.RegisterFallbackerPlugins()
+	// sharing plugins
+	server.RegisterServerPlugins()
+	client.RegisterClientPlugins()
+}
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 3ca5ebe..754037f 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -18,15 +18,10 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//
-// Collector ==> RawData ==> Parser ==> SerializableEvent
-//
 // Parser is a plugin interface, that defines new Parsers for Collector plugin.
 type Parser interface {
 	plugin.Plugin
@@ -37,13 +32,3 @@ type Parser interface {
 	// ParseStr parse the string into events.
 	ParseStr(str string) ([]event.SerializableEvent, error)
 }
-
-var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
-
-func GetParser(pluginName string, config map[string]interface{}) Parser {
-	return plugin.Get(ParserCategory, pluginName, config).(Parser)
-}
-
-func init() {
-	plugin.AddPluginCategory(ParserCategory)
-}
diff --git a/plugins/client/api/client.go b/plugins/parser/api/parser_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/parser/api/parser_repository.go
index 51d2b65..4abdb56 100644
--- a/plugins/client/api/client.go
+++ b/plugins/parser/api/parser_repository.go
@@ -23,25 +23,18 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// GetParser an initialized filter plugin.
+func GetParser(config plugin.Config) Parser {
+	return plugin.Get(reflect.TypeOf((*Parser)(nil)).Elem(), config).(Parser)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterParserPlugins register the used parser plugins.
+func RegisterParserPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem())
+	parsers := []Parser{
+		// Please register the parser plugins at here.
+	}
+	for _, parser := range parsers {
+		plugin.RegisterPlugin(parser)
+	}
 }
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index 9b95156..f25b2ff 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -18,8 +18,6 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
@@ -28,34 +26,24 @@ import (
 type Queue interface {
 	plugin.Plugin
 
-	// Publisher get the only publisher for the current queue.
-	Publisher() QueuePublisher
-
-	// Consumer get the only consumer for the current queue.
-	Consumer() QueueConsumer
-
-	// Close would close the queue.
-	Close()
-}
+	// Prepare creates the queue.
+	Prepare() error
 
-// QueuePublisher is a plugin interface, that defines new queue publishers.
-type QueuePublisher interface {
-	// Enqueue push a inputEvent into the queue.
-	Enqueue(event *event.SerializableEvent) error
-}
+	// Push a inputEvent into the queue.
+	Push(event event.SerializableEvent) error
 
-// QueueConsumer is a plugin interface, that defines new queue consumers.
-type QueueConsumer interface {
-	// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
-	Dequeue() (event *event.SerializableEvent, offset int64, err error)
-}
+	// Pop returns a SequenceEvent when Queue is not empty,
+	Pop() chan *SequenceEvent
 
-var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
+	// Close would close the queue.
+	Close() error
 
-func GetQueue(pluginName string, config map[string]interface{}) Queue {
-	return plugin.Get(QueueCategory, pluginName, config).(Queue)
+	// Ack the lastOffset
+	Ack(lastOffset event.Offset)
 }
 
-func init() {
-	plugin.AddPluginCategory(QueueCategory)
+// SequenceEvent is a wrapper to pass the event and the offset.
+type SequenceEvent struct {
+	Event  event.Event
+	Offset event.Offset
 }
diff --git a/main.go b/plugins/queue/api/queue_repository.go
similarity index 59%
copy from main.go
copy to plugins/queue/api/queue_repository.go
index 94fbc40..11031d0 100644
--- a/main.go
+++ b/plugins/queue/api/queue_repository.go
@@ -15,8 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// GetQueue an initialized filter plugin.
+func GetQueue(config plugin.Config) Queue {
+	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
+}
+
+// RegisterQueuePlugins register the used queue plugins.
+func RegisterQueuePlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Queue)(nil)).Elem())
+	queues := []Queue{
+		// Please register the queue plugins at here.
+	}
+	for _, queue := range queues {
+		plugin.RegisterPlugin(queue)
+	}
 }
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/receiver/api/receiver.go
similarity index 63%
copy from plugins/fallbacker/api/fallbacker.go
copy to plugins/receiver/api/receiver.go
index bde56ef..9e47692 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/receiver/api/receiver.go
@@ -18,27 +18,18 @@
 package api
 
 import (
-	"reflect"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/server/api"
 )
 
-// Fallbacker is a plugin interface, that defines some fallback strategies.
-type Fallbacker interface {
+// Receiver is a plugin interface, that defines new collectors.
+type Receiver interface {
 	plugin.Plugin
 
-	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents) Fallbacker
-}
-
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
-
-// Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
-	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
-}
+	// RegisterHandler register  a handler to the server, such as to handle a gRPC or an HTTP request
+	RegisterHandler(server api.Server)
 
-func init() {
-	plugin.AddPluginCategory(FallbackerCategory)
+	// Channel would be put a data when the receiver receives an APM data.
+	Channel() <-chan event.SerializableEvent
 }
diff --git a/plugins/client/api/client.go b/plugins/receiver/api/receiver_repository.go
similarity index 59%
copy from plugins/client/api/client.go
copy to plugins/receiver/api/receiver_repository.go
index 51d2b65..cbf9cbc 100644
--- a/plugins/client/api/client.go
+++ b/plugins/receiver/api/receiver_repository.go
@@ -23,25 +23,18 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
-
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
-
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+// Get an initialized receiver plugin.
+func GetReceiver(config plugin.Config) Receiver {
+	return plugin.Get(reflect.TypeOf((*Receiver)(nil)).Elem(), config).(Receiver)
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+// RegisterReceiverPlugins register the used receiver plugins.
+func RegisterReceiverPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Receiver)(nil)).Elem())
+	receivers := []Receiver{
+		// Please register the receiver plugins at here.
+	}
+	for _, receiver := range receivers {
+		plugin.RegisterPlugin(receiver)
+	}
 }
diff --git a/plugins/collector/log-grpc/README.md b/plugins/receiver/log-grpc/README.md
similarity index 100%
rename from plugins/collector/log-grpc/README.md
rename to plugins/receiver/log-grpc/README.md
diff --git a/main.go b/plugins/server/api/server.go
similarity index 74%
copy from main.go
copy to plugins/server/api/server.go
index 94fbc40..09d3f2b 100644
--- a/main.go
+++ b/plugins/server/api/server.go
@@ -15,8 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+
+// Server is a plugin interface, that defines new servers, such as gRPC server and http server.
+type Server interface {
+	plugin.SharingPlugin
+
+	// Start a server to receive the input APM data.
+	Start() error
 }
diff --git a/main.go b/plugins/server/api/server_repository.go
similarity index 59%
rename from main.go
rename to plugins/server/api/server_repository.go
index 94fbc40..f2f1dae 100644
--- a/main.go
+++ b/plugins/server/api/server_repository.go
@@ -15,8 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package api
 
-func main() {
-	print("OK")
+import (
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+// Get an initialized server plugin.
+func GetServer(config plugin.Config) Server {
+	return plugin.Get(reflect.TypeOf((*Server)(nil)).Elem(), config).(Server)
+}
+
+// RegisterServerPlugins register the used server plugins.
+func RegisterServerPlugins() {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Server)(nil)).Elem())
+	servers := []Server{
+		// Please register the server plugins at here.
+	}
+	for _, server := range servers {
+		plugin.RegisterPlugin(server)
+	}
 }