You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/08 11:13:52 UTC

[skywalking-satellite] branch boot-framework created (now c0939b1)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a change to branch boot-framework
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git.


      at c0939b1  add satellite main structure

This branch includes the following new commits:

     new c0939b1  add satellite main structure

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-satellite] 01/01: add satellite main structure

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch boot-framework
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit c0939b1f361f071c4fe4ba668dfca0264397acfc
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Dec 8 19:13:07 2020 +0800

    add satellite main structure
---
 main.go => cmd/command.go                          |  33 ++-
 main.go => cmd/main.go                             |  22 +-
 configs/{config.yaml => satellite_config.yaml}     |  61 ++--
 dist/LICENSE                                       |   4 +-
 dist/licenses/LICENSE-cli                          |  21 ++
 dist/licenses/LICENSE-viper                        |  21 ++
 go.mod                                             |   6 +-
 go.sum                                             | 306 +++++++++++++++++++++
 main.go => internal/pkg/constant/constant.go       |  15 +-
 internal/pkg/event/event.go                        |   8 +-
 internal/pkg/{logger => log}/log.go                |  92 +++++--
 internal/pkg/{logger => log}/log_test.go           |  44 ++-
 internal/pkg/plugin/default_plugin.go              |  76 +++++
 main.go => internal/pkg/plugin/definition.go       |  19 +-
 internal/pkg/plugin/plugin_test.go                 | 101 +++++++
 internal/pkg/plugin/registry.go                    |  65 +++--
 main.go => internal/pkg/test/path.go               |  18 +-
 internal/satellite/boot/boot.go                    | 105 +++++++
 .../satellite/config/clientmanager_config.go       |  34 +--
 .../satellite/config/gatherer_config.go            |  44 +--
 internal/satellite/config/loader.go                |  68 +++++
 internal/satellite/config/loader_test.go           | 133 +++++++++
 .../satellite/config/processor_config.go           |  23 +-
 internal/satellite/config/satellite_config.go      |  51 ++++
 .../satellite/config/sender_config.go              |  36 ++-
 internal/satellite/module/buffer/buffer.go         |  94 +++++++
 .../satellite/module/buffer/buffer_test.go         |  65 ++---
 internal/satellite/module/clientmanager.go         | 182 ++++++++++++
 internal/satellite/module/definition.go            |  98 +++++++
 internal/satellite/module/gatherer.go              | 112 ++++++++
 internal/satellite/module/processor.go             | 115 ++++++++
 internal/satellite/module/sender.go                | 181 ++++++++++++
 internal/pkg/plugin/define.go => plugins/README.md |  28 +-
 plugins/client/api/client.go                       |  19 +-
 plugins/client/example/client.go                   |  67 -----
 plugins/collector/api/collector.go                 |  17 +-
 plugins/collector/example/collector.go             |  70 -----
 plugins/collector/example/collector_test.go        |  71 -----
 plugins/fallbacker/api/fallbacker.go               |  13 +-
 plugins/fallbacker/example/fallbacker.go           |  57 ----
 plugins/fallbacker/example/fallbacker_test.go      |  71 -----
 plugins/fallbacker/timer/timer_fallbacker.go       |  62 +++++
 plugins/filter/api/filter.go                       |  10 +-
 plugins/filter/example/filter_test.go              |  71 -----
 plugins/forwarder/api/forwarder.go                 |  27 +-
 plugins/forwarder/example/forwarder.go             |  62 -----
 plugins/forwarder/example/forwarder_test.go        |  71 -----
 plugins/parser/api/parser.go                       |   8 +-
 plugins/parser/example/parser.go                   |  62 -----
 plugins/parser/example/parser_test.go              |  71 -----
 plugins/queue/api/queue.go                         |  18 +-
 plugins/queue/example/queue.go                     |  72 -----
 plugins/queue/example/queue_test.go                |  71 -----
 53 files changed, 2171 insertions(+), 1100 deletions(-)

diff --git a/main.go b/cmd/command.go
similarity index 54%
copy from main.go
copy to cmd/command.go
index 94fbc40..0457071 100644
--- a/main.go
+++ b/cmd/command.go
@@ -17,6 +17,35 @@
 
 package main
 
-func main() {
-	print("OK")
+import (
+	"github.com/urfave/cli/v2"
+
+	"github.com/apache/skywalking-satellite/internal/satellite/boot"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+var (
+	cmdStart = cli.Command{
+		Name:  "start",
+		Usage: "start satellite",
+		Flags: []cli.Flag{
+			&cli.StringFlag{
+				Name:    "config, c",
+				Usage:   "Load configuration from `FILE`",
+				EnvVars: []string{"MOSN_CONFIG"},
+				Value:   "configs/satellite_config.yaml",
+			},
+		},
+		Action: func(c *cli.Context) error {
+
+			cfg := loadConfig(c)
+			return boot.Start(cfg)
+		},
+	}
+)
+
+func loadConfig(c *cli.Context) *config.SatelliteConfig {
+	configPath := c.String("config")
+	cfg := config.Load(configPath)
+	return cfg
 }
diff --git a/main.go b/cmd/main.go
similarity index 61%
copy from main.go
copy to cmd/main.go
index 94fbc40..8c2d809 100644
--- a/main.go
+++ b/cmd/main.go
@@ -17,6 +17,26 @@
 
 package main
 
+import (
+	"os"
+	"time"
+
+	"github.com/urfave/cli/v2"
+)
+
+// Version will be initialized when building
+var Version string = "lastest"
+
 func main() {
-	print("OK")
+	app := cli.NewApp()
+	app.Name = "SkyWalking-Satellite"
+	app.Version = Version
+	app.Compiled = time.Now()
+	app.Usage = "Satellite is for collecting APM data."
+	app.Description = "A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs."
+	app.Commands = []*cli.Command{
+		&cmdStart,
+	}
+	app.Action = cli.ShowAppHelp
+	_ = app.Run(os.Args)
 }
diff --git a/configs/config.yaml b/configs/satellite_config.yaml
similarity index 54%
rename from configs/config.yaml
rename to configs/satellite_config.yaml
index 232aecb..68caab2 100644
--- a/configs/config.yaml
+++ b/configs/satellite_config.yaml
@@ -15,34 +15,35 @@
 # limitations under the License.
 #
 
-Gatherer:
-  - name: segment-receiver
-    type: segment-receiver
-    config:
-      key: value
-      key2: value2
-    queue:
-      type: mmap-queue
-      config:
+logger:
+  log_pattern: "%time [%level][%field] - %msg"
+  time_pattern: "2006-01-02 15:04:05.001"
+  level: "info"
+
+agents:
+  - common_config:
+      namespace: namespace1
+    gatherer:
+      collector:
+        plugin_name: segment-receiver
+        k: v
+      queue:
+        plugin_name: mmap-queue
         key: value
-        key2: value2
-    processor: processor1
-Processor:
-  - name: processor1
-    filters:
-      - filtername1
-      - filtername2
-      - filtername3
-Sender:
-  client:
-    name: gRPC-client
-    type: gRPC
-    config:
-      key1: value1
-      key2: value2
-  forwarders:
-    - type: segment-forwarder
-      eventType: segment
-      config:
-        key1: value1
-        key2: value2
\ No newline at end of file
+    processor:
+      filters:
+        - plugin_name: filtertype1
+          key: value
+    sender:
+      flush_time: 5000
+      max_buffer_size: 100
+      forwarders:
+        - plugin_name: segment-forwarder
+          key: value
+  - common_config:
+      namespace: sharing
+    client_manager:
+      retry_interval: 5000
+      client:
+        plugin_name: "grpc"
+        k: v
diff --git a/dist/LICENSE b/dist/LICENSE
index 2ab0d2f..618fe55 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -231,4 +231,6 @@ MIT licenses
 The following components are provided under the MIT License. See project link for details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
-	sirupsen (logrus) 1.7.0: https://github.com/sirupsen/logrus MIT
+	sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
+	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
+	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
diff --git a/dist/licenses/LICENSE-cli b/dist/licenses/LICENSE-cli
new file mode 100644
index 0000000..17356d3
--- /dev/null
+++ b/dist/licenses/LICENSE-cli
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Jeremy Saenz & Contributors
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/dist/licenses/LICENSE-viper b/dist/licenses/LICENSE-viper
new file mode 100644
index 0000000..4527efb
--- /dev/null
+++ b/dist/licenses/LICENSE-viper
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Steve Francia
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 71fd69d..438ceff 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,8 @@ module github.com/apache/skywalking-satellite
 
 go 1.14
 
-require github.com/sirupsen/logrus v1.7.0
+require (
+	github.com/sirupsen/logrus v1.7.0
+	github.com/spf13/viper v1.7.1
+	github.com/urfave/cli/v2 v2.3.0
+)
diff --git a/go.sum b/go.sum
index 4d74a1e..89357d1 100644
--- a/go.sum
+++ b/go.sum
@@ -1,10 +1,316 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
+cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
+cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
+cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
+cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
+cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
+cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
+cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
+dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
+github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
+github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
+github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
+github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
+github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
+github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
+github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
+github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
+github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
+github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
+github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
+github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
+github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
+github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
+github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
+github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
+github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
+github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
+github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
+github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
+github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
+github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
+github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
+github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
+github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
+github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
+github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
+github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
+github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
+github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
+github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
+github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
+github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
+github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
+github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
+github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
+github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
+github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
+go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
+golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
+golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
+golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
+golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
+golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
+golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
+google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
+google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
+gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
+rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/main.go b/internal/pkg/constant/constant.go
similarity index 77%
copy from main.go
copy to internal/pkg/constant/constant.go
index 94fbc40..f1ce73b 100644
--- a/main.go
+++ b/internal/pkg/constant/constant.go
@@ -15,8 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package constant
 
-func main() {
-	print("OK")
-}
+const (
+	// project name
+	ProjectName = "skywalking-satellite"
+
+	// Module names
+	ClientManagerModule = "client-manager"
+	GathererModule      = "gatherer"
+	ProcessorModule     = "processor"
+	SenderModule        = "sender"
+)
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index c8ff0f2..20c6187 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -73,20 +73,18 @@ type BatchEvents []Event
 
 // OutputEventContext is a container to store the output context.
 type OutputEventContext struct {
-	context map[string]Event
+	Context map[string]Event
 	Offset  int64
 }
 
 // Put puts the incoming event into the context when the event is a remote event.
 func (c *OutputEventContext) Put(event Event) {
-	if event.IsRemote() {
-		c.context[event.Name()] = event
-	}
+	c.Context[event.Name()] = event
 }
 
 // Get returns a event in the context. When the eventName does not exist, a error would be returned.
 func (c *OutputEventContext) Get(eventName string) (Event, error) {
-	e, ok := c.context[eventName]
+	e, ok := c.Context[eventName]
 	if !ok {
 		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
 		return nil, err
diff --git a/internal/pkg/logger/log.go b/internal/pkg/log/log.go
similarity index 51%
rename from internal/pkg/logger/log.go
rename to internal/pkg/log/log.go
index a62128d..9918dd4 100644
--- a/internal/pkg/logger/log.go
+++ b/internal/pkg/log/log.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package log
 
 import (
 	"fmt"
@@ -32,57 +32,97 @@ const (
 	defaultTimePattern = "2006-01-02 15:04:05.001"
 )
 
+// LoggerConfig initializes the global logger config.
+type LoggerConfig struct {
+	LogPattern  string `mapstructure:"log_pattern"`
+	TimePattern string `mapstructure:"time_pattern"`
+	Level       string `mapstructure:"level"`
+}
+
+// FormatOption is a function to set formatter config.
+type FormatOption func(f *formatter)
+
+// FormatOption is a function to set logger config.
+type ConfigOption func(l *logrus.Logger)
+
 type formatter struct {
 	logPattern  string
 	timePattern string
 }
 
-// Option is a function to set formatter config.
-type Option func(f *formatter)
-
-// Log is the global logger.
-var Log *logrus.Logger
+// Logger is the global logger.
+var Logger *logrus.Logger
 var once sync.Once
 
-// The Log init method, keep Log as a singleton.
-func Init(opts ...Option) {
+func Init(cfg *LoggerConfig) {
 	once.Do(func() {
-		if Log == nil {
-			Log = logrus.New()
-		}
-		Log.SetOutput(os.Stdout)
-		Log.SetLevel(logrus.InfoLevel)
-		f := &formatter{}
-		for _, opt := range opts {
-			opt(f)
-		}
-		if f.logPattern == "" {
-			f.logPattern = defaultLogPattern
+		var configOpts []ConfigOption
+		var formatOpts []FormatOption
+
+		if cfg.Level != "" {
+			configOpts = append(configOpts, SetLevel(cfg.Level))
+		} else {
+			configOpts = append(configOpts, SetLevel(logrus.InfoLevel.String()))
 		}
-		if f.timePattern == "" {
-			f.timePattern = defaultTimePattern
+		if cfg.TimePattern != "" {
+			formatOpts = append(formatOpts, SetTimePattern(cfg.TimePattern))
+		} else {
+			formatOpts = append(formatOpts, SetTimePattern(defaultTimePattern))
 		}
-		if !strings.Contains(f.logPattern, "\n") {
-			f.logPattern += "\n"
+		if cfg.LogPattern != "" {
+			formatOpts = append(formatOpts, SetLogPattern(cfg.LogPattern))
+		} else {
+			formatOpts = append(formatOpts, SetLogPattern(defaultLogPattern))
 		}
-		Log.SetFormatter(f)
+		initBySettings(configOpts, formatOpts)
 	})
 }
 
+// The Logger init method, keep Logger as a singleton.
+func initBySettings(configOpts []ConfigOption, formatOpts []FormatOption) {
+	// Default Logger.
+	Logger = logrus.New()
+	Logger.SetOutput(os.Stdout)
+	for _, opt := range configOpts {
+		opt(Logger)
+	}
+	// Default formatter.
+	f := &formatter{}
+	for _, opt := range formatOpts {
+		opt(f)
+	}
+	if !strings.Contains(f.logPattern, "\n") {
+		f.logPattern += "\n"
+	}
+	Logger.SetFormatter(f)
+}
+
 // Set the log pattern in formatter.
-func SetLogPattern(logPattern string) Option {
+func SetLogPattern(logPattern string) FormatOption {
 	return func(f *formatter) {
 		f.logPattern = logPattern
 	}
 }
 
 // Set the time pattern in formatter.
-func SetTimePattern(timePattern string) Option {
+func SetTimePattern(timePattern string) FormatOption {
 	return func(f *formatter) {
 		f.timePattern = timePattern
 	}
 }
 
+// Set the time pattern in formatter.
+func SetLevel(levelStr string) ConfigOption {
+	return func(logger *logrus.Logger) {
+		level, err := logrus.ParseLevel(levelStr)
+		if err != nil {
+			fmt.Printf("logger level does not exist: %s, level would be set info", levelStr)
+			level = logrus.InfoLevel
+		}
+		logger.SetLevel(level)
+	}
+}
+
 // Format supports unified log output format that has %time, %level, %field and %msg.
 func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
 	output := f.logPattern
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/log/log_test.go
similarity index 66%
rename from internal/pkg/logger/log_test.go
rename to internal/pkg/log/log_test.go
index d8ad895..2c690d8 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/log/log_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package log
 
 import (
 	"reflect"
@@ -26,7 +26,13 @@ import (
 )
 
 func TestFormatter_Format(t *testing.T) {
-	Init(SetLogPattern("[%time][%level][%field] - %msg"), SetTimePattern("2006-01-02 15:04:05,001"))
+	initBySettings([]ConfigOption{
+		SetLevel("warn"),
+	},
+		[]FormatOption{
+			SetLogPattern("[%time][%level][%field] - %msg"),
+			SetTimePattern("2006-01-02 15:04:05,001"),
+		})
 	type args struct {
 		entry *logrus.Entry
 	}
@@ -41,7 +47,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.TraceLevel
 					entry.Message = "entry1"
 					return entry
@@ -53,7 +59,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.WarnLevel
 					entry.Message = "entry2"
 					return entry
@@ -64,7 +70,7 @@ func TestFormatter_Format(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			f := Log.Formatter
+			f := Logger.Formatter
 			got, _ := f.Format(tt.args.entry)
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("Format() got = %s, want %s", got, tt.want)
@@ -72,3 +78,31 @@ func TestFormatter_Format(t *testing.T) {
 		})
 	}
 }
+
+func TestSetLevel(t *testing.T) {
+	type args struct {
+		opts ConfigOption
+	}
+	tests := []struct {
+		name string
+		args args
+		want logrus.Level
+	}{
+		{
+			name: "info",
+			args: args{
+				opts: SetLevel("warn"),
+			},
+			want: logrus.WarnLevel,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			logger := logrus.New()
+			tt.args.opts(logger)
+			if logger.Level != tt.want {
+				t.Errorf("SetLevel() got = %s, want %s", logger.Level, tt.want)
+			}
+		})
+	}
+}
diff --git a/internal/pkg/plugin/default_plugin.go b/internal/pkg/plugin/default_plugin.go
new file mode 100644
index 0000000..cab5adc
--- /dev/null
+++ b/internal/pkg/plugin/default_plugin.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"fmt"
+	"strings"
+
+	"github.com/spf13/viper"
+)
+
+// DefaultPluginNameField is a required field in DefaultConfig.
+const DefaultPluginNameField = "plugin_name"
+
+// DefaultInitializingPlugin defines the plugins initialized by defaultInitializing.
+type DefaultInitializingPlugin interface {
+	Plugin
+	// DefaultConfig returns the default config, that is a YAML pattern.
+	DefaultConfig() string
+}
+
+// DefaultConfig is used to initialize the DefaultInitializingPlugin.
+type DefaultConfig map[string]interface{}
+
+// defaultNameFinder is used to get the plugin name in DefaultConfig.
+func defaultNameFinder(cfg interface{}) string {
+	c, ok := cfg.(DefaultConfig)
+	if !ok {
+		panic(fmt.Errorf("defaultNameFinder only supports DefaultConfig"))
+	}
+	name, ok := c[DefaultPluginNameField]
+	if !ok {
+		panic(fmt.Errorf("%s is requeired in DefaultConfig", DefaultPluginNameField))
+	}
+	return name.(string)
+}
+
+// defaultInitializing initialize the fields by fields mapping.
+func defaultInitializing(plugin Plugin, cfg interface{}) {
+	c, ok := cfg.(DefaultConfig)
+	if !ok {
+		panic(fmt.Errorf("%s plugin is a DefaultInitializingPlugin, but the type of configuration is illegal", plugin.Name()))
+	}
+	v := viper.New()
+	v.SetConfigType("yaml")
+	p := plugin.(DefaultInitializingPlugin)
+	if p.DefaultConfig() != "" {
+		if err := v.ReadConfig(strings.NewReader(p.DefaultConfig())); err != nil {
+			panic(fmt.Errorf("cannot read default config in the plugin: %s, the error is %v", plugin.Name(), err))
+		}
+	}
+	if err := v.MergeConfigMap(c); err != nil {
+		panic(fmt.Errorf("%s plugin cannot merge the custom configuration, the error is %v", plugin.Name(), err))
+	}
+	if err := v.Unmarshal(plugin); err != nil {
+		panic(fmt.Errorf("cannot inject  the config to the %s plugin, the error is %v", plugin.Name(), err))
+	}
+}
+
+// defaultCallBack does nothing.
+func defaultCallBack(plugin Plugin) {}
diff --git a/main.go b/internal/pkg/plugin/definition.go
similarity index 58%
copy from main.go
copy to internal/pkg/plugin/definition.go
index 94fbc40..bf73689 100644
--- a/main.go
+++ b/internal/pkg/plugin/definition.go
@@ -15,8 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package plugin
 
-func main() {
-	print("OK")
+// Plugin defines the plugin model in Satellite.
+type Plugin interface {
+	// Name returns the name of the specific plugin.
+	Name() string
+	// Description returns the description of the specific plugin.
+	Description() string
 }
+
+// NameFinderFunc is used to get the plugin name from different plugin configs.
+type NameFinderFunc func(config interface{}) string
+
+// InitializingFunc is used to initialize the specific plugin.
+type InitializingFunc func(plugin Plugin, config interface{})
+
+// CallbackFunc would be invoked after initializing.
+type CallbackFunc func(plugin Plugin)
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
new file mode 100644
index 0000000..a14eb3b
--- /dev/null
+++ b/internal/pkg/plugin/plugin_test.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"reflect"
+	"testing"
+)
+
+type DemoCategory interface {
+	DefaultInitializingPlugin
+	Say() string
+}
+
+type DemoPlugin struct {
+	Organization string `mapstructure:"organization"`
+	Project      string `mapstructure:"project"`
+}
+
+func (d *DemoPlugin) Say() string {
+	return d.Organization + ":" + d.Project
+}
+
+func (d *DemoPlugin) Name() string {
+	return "demoplugin"
+}
+
+func (d *DemoPlugin) Description() string {
+	return "this is just a demo"
+}
+
+func (d *DemoPlugin) DefaultConfig() string {
+	return `
+organization: "ASF"
+project: "skywalking-satellite"
+`
+}
+
+func TestPlugin(t *testing.T) {
+	tests := []struct {
+		name string
+		args DefaultConfig
+		want *DemoPlugin
+	}{
+		{
+			name: "test1",
+			args: DefaultConfig{
+				"plugin_name":  "demoplugin",
+				"organization": "CNCF",
+				"project":      "Fluentd",
+			},
+			want: &DemoPlugin{
+				Organization: "CNCF",
+				Project:      "Fluentd",
+			},
+		},
+		{
+			name: "demoplugin",
+			args: DefaultConfig{
+				"plugin_name": "demoplugin",
+			},
+			want: &DemoPlugin{
+				Organization: "ASF",
+				Project:      "skywalking-satellite",
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			defer func() {
+				if i := recover(); i != nil {
+					t.Errorf("the plugin %s is not exist", "demoplugin")
+				}
+			}()
+			plugin := Get(reflect.TypeOf((*DemoCategory)(nil)).Elem(), tt.args)
+			if !reflect.DeepEqual(plugin, tt.want) {
+				t.Errorf("Format() got = %v, want %v", plugin, tt.want)
+			}
+		})
+	}
+}
+
+func init() {
+	RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem(), nil, nil, nil)
+	RegisterPlugin(&DemoPlugin{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 4731d36..9059d66 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -23,57 +23,86 @@ import (
 	"sync"
 )
 
-// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
-// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
-// used as key in pluginRegistry.
-
-// reg is the global plugin registry
+// the global plugin registry
 var (
-	reg  map[reflect.Type]map[string]reflect.Value
-	lock sync.Mutex
+	lock              sync.Mutex
+	reg               map[reflect.Type]map[string]reflect.Value
+	initFuncReg       map[reflect.Type]InitializingFunc
+	callbackFuncReg   map[reflect.Type]CallbackFunc
+	nameFinderFuncReg map[reflect.Type]NameFinderFunc
 )
 
 func init() {
 	reg = make(map[reflect.Type]map[string]reflect.Value)
+	initFuncReg = make(map[reflect.Type]InitializingFunc)
+	callbackFuncReg = make(map[reflect.Type]CallbackFunc)
+	nameFinderFuncReg = make(map[reflect.Type]NameFinderFunc)
 }
 
-// Add new plugin category. The different plugin category could have same plugin names.
-func AddPluginCategory(pluginCategory reflect.Type) {
+// RegisterCategory register new plugin category with default InitializingFunc.
+// required:
+// pluginCategory: the plugin interface type.
+// Optional:
+// n: the plugin name finder,and the default value is defaultNameFinder
+// i, the plugin initializer, and the default value is defaultInitializing
+// c, the plugin initializer callback func, and the default value is defaultCallBack
+func RegisterPluginCategory(pluginCategory reflect.Type, n NameFinderFunc, i InitializingFunc, c CallbackFunc) {
 	lock.Lock()
 	defer lock.Unlock()
 	reg[pluginCategory] = map[string]reflect.Value{}
+
+	if n == nil {
+		nameFinderFuncReg[pluginCategory] = defaultNameFinder
+	} else {
+		nameFinderFuncReg[pluginCategory] = n
+	}
+	if i == nil {
+		initFuncReg[pluginCategory] = defaultInitializing
+	} else {
+		initFuncReg[pluginCategory] = i
+	}
+	if c == nil {
+		callbackFuncReg[pluginCategory] = defaultCallBack
+	} else {
+		callbackFuncReg[pluginCategory] = c
+	}
 }
 
 // RegisterPlugin registers the pluginType as plugin.
 // If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
-func RegisterPlugin(pluginName string, plugin interface{}) {
+func RegisterPlugin(plugin Plugin) {
 	lock.Lock()
 	defer lock.Unlock()
 	v := reflect.ValueOf(plugin)
 	success := false
 	for pCategory, pReg := range reg {
 		if v.Type().Implements(pCategory) {
-			pReg[pluginName] = v
-			fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+			pReg[plugin.Name()] = v
+			fmt.Printf("register %s %s successfully ", plugin.Name(), v.Type().String())
 			success = true
 		}
 	}
 	if !success {
-		fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+		fmt.Printf("this type of %s is not supported to register : %s", plugin.Name(), v.Type().String())
 	}
 }
 
-// Get the specific plugin according to the pluginCategory and pluginName.
-func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
-	value, ok := reg[pluginCategory][pluginName]
+// Get an initialized specific plugin according to the pluginCategory and pluginName.
+func Get(category reflect.Type, cfg interface{}) Plugin {
+	lock.Lock()
+	defer lock.Unlock()
+	pluginName := nameFinderFuncReg[category](cfg)
+	value, ok := reg[category][pluginName]
 	if !ok {
-		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
 	}
 	t := value.Type()
 	if t.Kind() == reflect.Ptr {
 		t = t.Elem()
 	}
+
 	plugin := reflect.New(t).Interface().(Plugin)
-	plugin.InitPlugin(config)
+	initFuncReg[category](plugin, cfg)
+	callbackFuncReg[category](plugin)
 	return plugin
 }
diff --git a/main.go b/internal/pkg/test/path.go
similarity index 69%
copy from main.go
copy to internal/pkg/test/path.go
index 94fbc40..cf5adcc 100644
--- a/main.go
+++ b/internal/pkg/test/path.go
@@ -15,8 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package test
 
-func main() {
-	print("OK")
+import (
+	"fmt"
+	"os"
+	"strings"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+)
+
+func FindRootPath() (string, error) {
+	pwd, err := os.Getwd()
+	if err != nil {
+		return "", fmt.Errorf("could not find the project root path: %v", err)
+	}
+	return pwd[0 : strings.Index(pwd, constant.ProjectName)+len(constant.ProjectName)], nil
 }
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
new file mode 100644
index 0000000..b8ae2db
--- /dev/null
+++ b/internal/satellite/boot/boot.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package boot
+
+import (
+	"context"
+	"os"
+	"sync"
+	"syscall"
+
+	"os/signal"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/module"
+)
+
+// Start Satellite.
+func Start(cfg *config.SatelliteConfig) error {
+	ctx, cancel := context.WithCancel(context.Background())
+	addShutdownListener(cancel)
+	initLogger(cfg)
+	initModules(cfg)
+	if err := prepareModules(); err != nil {
+		return err
+	}
+	bootModules(ctx)
+	return nil
+}
+
+// addShutdownListener add a close signal listener.
+func addShutdownListener(cancel context.CancelFunc) {
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
+	go func() {
+		<-signals
+		cancel()
+	}()
+}
+
+// initLogger init the global logger.
+func initLogger(cfg *config.SatelliteConfig) {
+	log.Init(cfg.Logger)
+}
+
+// initModules init the modules and register the modules to the module container.
+func initModules(cfg *config.SatelliteConfig) {
+	log.Logger.Infof("satellite is initializing...")
+	for _, agentConfig := range cfg.Agents {
+		if agentConfig.ClientManager != nil {
+			module.NewModuleService(agentConfig.ClientManager)
+		}
+		if agentConfig.Sender != nil {
+			module.NewModuleService(agentConfig.Sender)
+		}
+		if agentConfig.Processor != nil {
+			module.NewModuleService(agentConfig.Processor)
+		}
+		if agentConfig.Gatherer != nil {
+			module.NewModuleService(agentConfig.Gatherer)
+		}
+	}
+}
+
+// prepareModules makes that all modules are in a bootable state.
+func prepareModules() error {
+	log.Logger.Infof("satellite is prepare to start...")
+	for _, service := range module.GetModuleContainer() {
+		if err := service.Prepare(); err != nil {
+			log.Logger.Errorf("%s module of %s namespace is error in preparing stage, error is %v", service.Name(), service.Config().NameSpace(), err)
+			return err
+		}
+	}
+	return nil
+}
+
+// prepareModules boot all modules.
+func bootModules(ctx context.Context) {
+	log.Logger.Infof("satellite is starting...")
+	var wg sync.WaitGroup
+	for _, service := range module.GetModuleContainer() {
+		service := service
+		go func() {
+			defer wg.Done()
+			wg.Add(1)
+			service.Boot(ctx)
+		}()
+	}
+	wg.Wait()
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/clientmanager_config.go
similarity index 58%
copy from plugins/client/api/client.go
copy to internal/satellite/config/clientmanager_config.go
index 51d2b65..3f8bdfa 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/clientmanager_config.go
@@ -15,33 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package config
 
 import (
-	"reflect"
-
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
+// Config defines the initialization params for ClientManager.
+type ClientManagerConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
+	// plugins config
+	ClientConfig plugin.DefaultConfig `mapstructure:"client"` // the client plugin config
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+	// self config
+	RetryInterval int64 `mapstructure:"retry_interval"` // the client retry interval when disconnected.
+}
 
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (c *ClientManagerConfig) ModuleName() string {
+	return constant.ClientManagerModule
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+func (c *ClientManagerConfig) NameSpace() string {
+	return c.common.RunningNamespace
 }
diff --git a/plugins/filter/example/filter.go b/internal/satellite/config/gatherer_config.go
similarity index 56%
rename from plugins/filter/example/filter.go
rename to internal/satellite/config/gatherer_config.go
index 14abb69..ded14e1 100644
--- a/plugins/filter/example/filter.go
+++ b/internal/satellite/config/gatherer_config.go
@@ -15,40 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package example
+package config
 
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
 
-type demoFilter struct {
-	a string
-}
-
-type demoFilter2 struct {
-	a string
-}
-
-type demoFilter3 struct {
-	a string
-}
-
-func (d *demoFilter) Description() string {
-	panic("implement me")
-}
-
-func (d *demoFilter) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFilter) Process(in event.Event) event.Event {
-	panic("implement me")
-}
+type GathererConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-func (d demoFilter2) Description() string {
-	panic("implement me")
+	// plugins config
+	CollectorConfig plugin.DefaultConfig `mapstructure:"collector"` // collector plugin config
+	QueueConfig     plugin.DefaultConfig `mapstructure:"queue"`     // queue plugin config
 }
 
-func (d demoFilter2) InitPlugin(config map[string]interface{}) {
+func (g *GathererConfig) ModuleName() string {
+	return constant.GathererModule
 }
 
-func (d demoFilter2) Process(in event.Event) event.Event {
-	panic("implement me")
+func (g *GathererConfig) NameSpace() string {
+	return g.common.RunningNamespace
 }
diff --git a/internal/satellite/config/loader.go b/internal/satellite/config/loader.go
new file mode 100644
index 0000000..c9b0dac
--- /dev/null
+++ b/internal/satellite/config/loader.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"sync"
+
+	"io/ioutil"
+	"path/filepath"
+
+	"github.com/spf13/viper"
+)
+
+var (
+	cfgLock sync.Mutex
+)
+
+// Load SatelliteConfig. The func could not use global logger of Satellite, because it is executed before logger initialization.
+func Load(configPath string) *SatelliteConfig {
+	cfgLock.Lock()
+	defer cfgLock.Unlock()
+	fmt.Printf("load config from : %s\n", configPath)
+	cfg, err := load(configPath)
+	if err != nil {
+		panic(fmt.Errorf("could not load config form the path: %s, the error is :%v", configPath, err))
+	} else {
+		return cfg
+	}
+}
+
+// load SatelliteConfig from the yaml config.
+func load(configPath string) (*SatelliteConfig, error) {
+	absolutePath, err := filepath.Abs(configPath)
+	if err != nil {
+		return nil, err
+	}
+	content, err := ioutil.ReadFile(absolutePath)
+	if err != nil {
+		return nil, err
+	}
+	v := viper.New()
+	v.SetConfigType("yaml")
+	cfg := SatelliteConfig{}
+	if err := v.ReadConfig(bytes.NewReader(content)); err != nil {
+		return nil, err
+	}
+	if err := v.Unmarshal(&cfg); err != nil {
+		return nil, err
+	}
+	return &cfg, nil
+}
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
new file mode 100644
index 0000000..e31c7f2
--- /dev/null
+++ b/internal/satellite/config/loader_test.go
@@ -0,0 +1,133 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"encoding/json"
+	"reflect"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/pkg/test"
+)
+
+func TestLoad(t *testing.T) {
+	rootPath, err := test.FindRootPath()
+	if err != nil {
+		panic(err)
+	}
+	type args struct {
+		configPath string
+	}
+	tests := []struct {
+		name string
+		args args
+		want *SatelliteConfig
+	}{
+		{
+			name: "Legal configuration",
+			args: args{configPath: rootPath + "/configs/satellite_config.yaml"},
+			want: &SatelliteConfig{
+				Logger: &log.LoggerConfig{
+					LogPattern:  "%time [%level][%field] - %msg",
+					TimePattern: "2006-01-02 15:04:05.001",
+					Level:       "info",
+				},
+				Agents: []*AgentConfig{
+					{
+						ModuleCommonConfig: &ModuleCommonConfig{
+							RunningNamespace: "namespace1",
+						},
+
+						Gatherer: &GathererConfig{
+							CollectorConfig: plugin.DefaultConfig{
+								"plugin_name": "segment-receiver",
+								"k":           "v",
+							},
+							QueueConfig: plugin.DefaultConfig{
+								"plugin_name": "mmap-queue",
+								"key":         "value",
+							},
+						},
+						Processor: &ProcessorConfig{
+							FilterConfig: []plugin.DefaultConfig{
+								{
+									"plugin_name": "filtertype1",
+									"key":         "value",
+								},
+							},
+						},
+						Sender: &SenderConfig{
+							FlushTime:     5000,
+							MaxBufferSize: 100,
+							ForwardersConfig: []plugin.DefaultConfig{
+								{
+									"plugin_name": "segment-forwarder",
+									"key":         "value",
+								},
+							},
+						},
+					},
+					{
+						ModuleCommonConfig: &ModuleCommonConfig{
+							RunningNamespace: "sharing",
+						},
+						ClientManager: &ClientManagerConfig{
+							RetryInterval: 5000,
+							ClientConfig: plugin.DefaultConfig{
+								"plugin_name": "grpc",
+								"k":           "v",
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c, err := load(tt.args.configPath)
+			if err != nil {
+				t.Fatalf("cannot load config: %v", err)
+			}
+			doJudgeEqual(t, c.Logger, tt.want.Logger)
+			doJudgeEqual(t, c.Agents[0].ModuleCommonConfig, tt.want.Agents[0].ModuleCommonConfig)
+			doJudgeEqual(t, c.Agents[0].Gatherer, tt.want.Agents[0].Gatherer)
+			doJudgeEqual(t, c.Agents[0].Processor, tt.want.Agents[0].Processor)
+			doJudgeEqual(t, c.Agents[0].Sender, tt.want.Agents[0].Sender)
+
+			doJudgeEqual(t, c.Agents[1].ModuleCommonConfig, tt.want.Agents[1].ModuleCommonConfig)
+			doJudgeEqual(t, c.Agents[1].ClientManager, tt.want.Agents[1].ClientManager)
+		})
+	}
+}
+
+func doJudgeEqual(t *testing.T, a, b interface{}) {
+	if !reflect.DeepEqual(a, b) {
+		ajson, err := json.Marshal(a)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		bjson, err := json.Marshal(b)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		t.Fatalf("config is not equal, got %s, want %s", ajson, bjson)
+	}
+}
diff --git a/main.go b/internal/satellite/config/processor_config.go
similarity index 62%
rename from main.go
rename to internal/satellite/config/processor_config.go
index 94fbc40..dd65844 100644
--- a/main.go
+++ b/internal/satellite/config/processor_config.go
@@ -15,8 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package config
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+type ProcessorConfig struct {
+	// common config
+	common *ModuleCommonConfig
+
+	// plugins config
+	FilterConfig []plugin.DefaultConfig `mapstructure:"filters"` // filter plugins
+}
+
+func (p *ProcessorConfig) ModuleName() string {
+	return constant.ProcessorModule
+}
+
+func (p *ProcessorConfig) NameSpace() string {
+	return p.common.RunningNamespace
 }
diff --git a/internal/satellite/config/satellite_config.go b/internal/satellite/config/satellite_config.go
new file mode 100644
index 0000000..9160cc6
--- /dev/null
+++ b/internal/satellite/config/satellite_config.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// SatelliteConfig is to initialize Satellite.
+type SatelliteConfig struct {
+	Logger *log.LoggerConfig `mapstructure:"logger"`
+	Agents []*AgentConfig    `mapstructure:"agents"`
+}
+
+// AgentConfig initializes the different module in different namespace.
+type AgentConfig struct {
+	ModuleCommonConfig *ModuleCommonConfig  `mapstructure:"common_config"`
+	ClientManager      *ClientManagerConfig `mapstructure:"client_manager"`
+	Gatherer           *GathererConfig      `mapstructure:"gatherer"`
+	Processor          *ProcessorConfig     `mapstructure:"processor"`
+	Sender             *SenderConfig        `mapstructure:"sender"`
+}
+
+// ModuleConfig is an interface to define the initialization fields in ModuleService.
+type ModuleConfig interface {
+	// ModuleName returns the name of current module.
+	ModuleName() string
+	// NameSpace returns the current running namespace. Satellite support multi-agents running in one Satellite
+	// process, the namespace is a space concept to distinguish different agents.
+	NameSpace() string
+}
+
+// ModuleCommonConfig has some common fields of each ModuleConfig.
+type ModuleCommonConfig struct {
+	RunningNamespace string `mapstructure:"namespace"`
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/sender_config.go
similarity index 52%
copy from plugins/client/api/client.go
copy to internal/satellite/config/sender_config.go
index 51d2b65..d2fa50d 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/sender_config.go
@@ -15,33 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package config
 
 import (
-	"reflect"
-
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
+type SenderConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
+	// plugins config
+	ForwardersConfig []plugin.DefaultConfig `mapstructure:"forwarders"` // forwarder plugins config
+	FallbackerConfig plugin.DefaultConfig   `mapstructure:"fallbacker"` // fallbacker plugins config
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+	// self config
+	MaxBufferSize  int `mapstructure:"max_buffer_size"`  // the max buffer capacity
+	MinFlushEvents int `mapstructure:"min_flush_events"` // the min flush events when receives a timer flush signal
+	FlushTime      int `mapstructure:"flush_time"`       // the period flush time
+}
 
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (s *SenderConfig) ModuleName() string {
+	return constant.SenderModule
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+func (s *SenderConfig) NameSpace() string {
+	return s.common.RunningNamespace
 }
diff --git a/internal/satellite/module/buffer/buffer.go b/internal/satellite/module/buffer/buffer.go
new file mode 100644
index 0000000..ec3c59e
--- /dev/null
+++ b/internal/satellite/module/buffer/buffer.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// BatchBuffer is a buffer to cache the input data in Sender.
+type BatchBuffer struct {
+	sync.Mutex                             // local
+	buf        []*event.OutputEventContext // cache
+	first      int64                       // the first OutputEventContext offset
+	last       int64                       // the last OutputEventContext offset
+	size       int                         // usage size
+	cap        int                         // the max capacity
+}
+
+// NewBuffer crate a new BatchBuffer according to the capacity param.
+func NewBatchBuffer(capacity int) *BatchBuffer {
+	return &BatchBuffer{
+		buf:   make([]*event.OutputEventContext, capacity),
+		first: 0,
+		last:  0,
+		size:  0,
+		cap:   capacity,
+	}
+}
+
+// Buf returns the cached data in BatchBuffer.
+func (b *BatchBuffer) Buf() []*event.OutputEventContext {
+	b.Lock()
+	defer b.Unlock()
+	return b.buf
+}
+
+// First returns the first OutputEventContext offset.
+func (b *BatchBuffer) First() int64 {
+	b.Lock()
+	defer b.Unlock()
+	return b.first
+}
+
+// Len returns the usage size.
+func (b *BatchBuffer) Len() int {
+	b.Lock()
+	defer b.Unlock()
+	return b.size
+}
+
+// BatchSize returns the offset increment of the cached data.
+func (b *BatchBuffer) BatchSize() int {
+	b.Lock()
+	defer b.Unlock()
+	return int(b.last - b.first + 1)
+}
+
+// Add adds a new data input buffer.
+func (b *BatchBuffer) Add(data *event.OutputEventContext) {
+	b.Lock()
+	defer b.Unlock()
+	if b.size == b.cap {
+		log.Logger.Errorf("cannot add one item to the fulling BatchBuffer, the capacity is %d", b.cap)
+		return
+	} else if data.Offset <= 0 {
+		log.Logger.Errorf("cannot add one item to BatchBuffer because the input data is illegal, the offset is %d", data.Offset)
+		return
+	}
+	if b.size == 0 {
+		b.first = data.Offset
+	} else {
+		b.last = data.Offset
+	}
+	b.buf[b.size] = data
+	b.size++
+}
diff --git a/plugins/client/example/client_test.go b/internal/satellite/module/buffer/buffer_test.go
similarity index 54%
rename from plugins/client/example/client_test.go
rename to internal/satellite/module/buffer/buffer_test.go
index a9e355e..be9e9a8 100644
--- a/plugins/client/example/client_test.go
+++ b/internal/satellite/module/buffer/buffer_test.go
@@ -15,57 +15,58 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package example
+package buffer
 
 import (
 	"testing"
 
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/client/api"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
 )
 
-func Test_Register(t *testing.T) {
+func TestNewBuffer(t *testing.T) {
+	buffer := NewBatchBuffer(3)
 	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
+		name string
+		args *event.OutputEventContext
+		want int
 	}{
 		{
-			name: "demoClient",
-			args: demoClient{
-				a: "s",
-			},
-			panic: false,
+			name: "add-1",
+			args: &event.OutputEventContext{Offset: 1},
+			want: 1,
 		},
 		{
-			name: "demoClient2",
-			args: &demoClient2{
-				a: "s",
-			},
-			panic: false,
+			name: "add-2",
+			args: &event.OutputEventContext{Offset: 2},
+			want: 2,
 		},
 		{
-			name: "demoClient3",
-			args: demoClient3{
-				a: "s",
-			},
-			panic: true,
+			name: "add-3",
+			args: &event.OutputEventContext{Offset: 3},
+			want: 3,
+		},
+		{
+			name: "add-4",
+			args: &event.OutputEventContext{Offset: 4},
+			want: 3,
+		},
+		{
+			name: "add-5",
+			args: &event.OutputEventContext{Offset: 5},
+			want: 3,
 		},
 	}
-
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
+			buffer.Add(tt.args)
+			if got := buffer.Len(); got != tt.want {
+				t.Errorf("Buffer Len() = %v, want %v", got, tt.want)
+			}
 		})
 	}
 }
 
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetClient(name, config)
+func init() {
+	log.Init(&log.LoggerConfig{})
 }
diff --git a/internal/satellite/module/clientmanager.go b/internal/satellite/module/clientmanager.go
new file mode 100644
index 0000000..3d98cac
--- /dev/null
+++ b/internal/satellite/module/clientmanager.go
@@ -0,0 +1,182 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+// The specific client statuses.
+const (
+	_ ClientStatus = iota
+	Connected
+	Disconnect
+)
+
+// ClientStatus represents the status of the client.
+type ClientStatus int8
+
+// ClientManager is a module plugin to control the connection with the outer service.
+type ClientManager struct {
+	sync.Mutex
+
+	// config
+	config *config.ClientManagerConfig
+
+	// dependency plugins
+	runningClient client.Client
+
+	// self components
+	listeners []chan ClientStatus // the sender client status listeners
+
+	// Metrics:
+	retryCount int64
+	status     ClientStatus
+}
+
+func (c *ClientManager) Name() string {
+	return constant.ClientManagerModule
+}
+
+func (c *ClientManager) Description() string {
+	return "keep connection with external services, such as Kafka and OAP."
+}
+
+func (c *ClientManager) Config() config.ModuleConfig {
+	return c.config
+}
+
+// Init ClientManager, dependency plugins and self components.
+func (c *ClientManager) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", c.Name(), c.config.NameSpace())
+	c.config = cfg.(*config.ClientManagerConfig)
+	c.runningClient = client.GetClient(c.config.ClientConfig)
+	c.listeners = []chan ClientStatus{}
+}
+
+// Prepare makes the connection with external services, such as Kafka and OAP.
+func (c *ClientManager) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", c.Name(), c.config.NameSpace())
+	return c.initializeClient()
+}
+
+// Boot start ClientManager to maintain the connection with external services.
+func (c *ClientManager) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", c.Name(), c.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(c.config.RetryInterval) * time.Millisecond)
+		for {
+			select {
+			case <-timeTicker.C:
+				if err := c.connectClient(); err != nil {
+					log.Logger.Errorf("cannot make a connection with the %s client", c.runningClient.Name())
+				}
+			case <-ctx.Done():
+				c.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown close the connection and listeners.
+func (c *ClientManager) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", c.Name(), c.config.NameSpace())
+	for _, listener := range c.listeners {
+		close(listener)
+	}
+	if err := c.runningClient.Close(); err != nil {
+		log.Logger.Errorf("an error occurring when closing %s client: %v", c.runningClient.Name(), err)
+	}
+}
+
+// RegisterListener adds the listener to listen to the status of the client.
+func (c *ClientManager) RegisterListener(listener chan ClientStatus) {
+	c.Lock()
+	defer c.Unlock()
+	c.listeners = append(c.listeners, listener)
+}
+
+// GetClient returns a connected client when . Otherwise, would return a nil client.
+func (c *ClientManager) GetConnectedClient() interface{} {
+	return c.runningClient.GetConnectedClient()
+}
+
+// ReportError reports the client is disconnect.
+func (c *ClientManager) ReportError() {
+	c.Lock()
+	defer c.Unlock()
+	if c.status == Connected {
+		c.status = Disconnect
+		c.notify()
+	}
+}
+
+// initializeClient initialize the connection with external services and retry one time when initialize failed.
+func (c *ClientManager) initializeClient() error {
+	c.Lock()
+	defer c.Unlock()
+	if err := c.connectClient(); err != nil {
+		log.Logger.Infof("preparing to reconnect with %s client,retrying in 10s", c.runningClient.Name())
+		time.Sleep(10 * time.Second)
+		return c.connectClient()
+	}
+	return nil
+}
+
+// connectClient would make a connection with external services, such as Kafka and OAP. When successfully connected,
+// ClientManager would notify the connected status to all senders.
+func (c *ClientManager) connectClient() error {
+	c.Lock()
+	defer c.Unlock()
+	if c.runningClient.IsConnected() {
+		return nil
+	}
+	log.Logger.Infof("preparing to connect with %s client", c.runningClient.Name())
+	if c.runningClient.IsConnected() {
+		return nil
+	}
+	c.retryCount++
+	err := c.runningClient.Connect()
+	if err == nil {
+		c.status = Connected
+		c.notify()
+		log.Logger.Infof("successfully connected to %s client", c.runningClient.Name())
+	}
+	return err
+}
+
+// notify the current status to all listeners.
+func (c *ClientManager) notify() {
+	c.Lock()
+	defer c.Unlock()
+	for _, listener := range c.listeners {
+		listener <- c.status
+	}
+}
diff --git a/internal/satellite/module/definition.go b/internal/satellite/module/definition.go
new file mode 100644
index 0000000..6348f03
--- /dev/null
+++ b/internal/satellite/module/definition.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+const (
+	// If a module is stored in the sharing namespace, the modules would be shared with all namespaces in moduleContainer.
+	SharingNamespaceName = "sharing"
+)
+
+// TODO add metrics func
+// Service id a custom plugin interface, which defines the processing.
+type Service interface {
+	plugin.Plugin
+
+	// Config returns the using ModuleConfig.
+	Config() config.ModuleConfig
+
+	// Init initialize the Module and register the instance to the registry.
+	// In this stage, no components is running.
+	Init(config config.ModuleConfig)
+
+	// Prepare would inject the dependency module and do dependency initialization.
+	Prepare() error
+
+	// Boot would start the module and return error when started failed. When a stop signal received
+	// or an exception occurs, the shutdown function would be called.
+	Boot(ctx context.Context)
+
+	// Shutdown could do some clean job to close Service.
+	Shutdown()
+}
+
+var moduleContainer map[string]Service
+
+// NewModuleService returns a new initialized Service and register it to the module container.
+func NewModuleService(cfg config.ModuleConfig) {
+	_ = plugin.Get(reflect.TypeOf((*Service)(nil)).Elem(), cfg).(Service)
+}
+
+// GetRunningModule returns a running Service.
+func GetRunningModule(namespace, moduleName string) Service {
+	if moduleService, ok := moduleContainer[namespace+moduleName]; ok {
+		return moduleService
+	}
+	if moduleService, ok := moduleContainer[SharingNamespaceName+moduleName]; ok {
+		return moduleService
+	}
+	return nil
+}
+
+func GetModuleContainer() map[string]Service {
+	return moduleContainer
+}
+
+// Register the Service category to the plugin registry.
+func init() {
+	moduleContainer = make(map[string]Service)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Service)(nil)).Elem(),
+		func(cfg interface{}) string {
+			// Get plugin name to find the specific Service plugin.
+			return cfg.(config.ModuleConfig).ModuleName()
+		},
+		func(plugin plugin.Plugin, cfg interface{}) {
+			// Initialize Service.
+			ms := plugin.(Service)
+			mc := cfg.(config.ModuleConfig)
+			ms.Init(mc)
+		},
+		func(plugin plugin.Plugin) {
+			// Register the initialized Service to the moduleContainer.
+			ms := plugin.(Service)
+			moduleContainer[ms.Config().NameSpace()+ms.Config().ModuleName()] = ms
+		},
+	)
+}
diff --git a/internal/satellite/module/gatherer.go b/internal/satellite/module/gatherer.go
new file mode 100644
index 0000000..765f6cc
--- /dev/null
+++ b/internal/satellite/module/gatherer.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	collector "github.com/apache/skywalking-satellite/plugins/collector/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+// Gatherer is the APM data collection module in Satellite.
+type Gatherer struct {
+	// config
+	config *config.GathererConfig
+
+	// dependency plugins
+	runningCollector collector.Collector
+	runningQueue     queue.Queue
+}
+
+func (g *Gatherer) Name() string {
+	return constant.GathererModule
+}
+
+func (g *Gatherer) Description() string {
+	return "gatherer is the APM data collection module in Satellite, which supports Log, Trace, and Metrics scopes."
+}
+
+func (g *Gatherer) Config() config.ModuleConfig {
+	return g.config
+}
+
+// Init Gatherer, dependency plugins.
+func (g *Gatherer) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", g.Name(), g.config.NameSpace())
+	g.config = cfg.(*config.GathererConfig)
+	g.runningCollector = collector.GetCollector(g.config.CollectorConfig)
+	g.runningQueue = queue.GetQueue(g.config.CollectorConfig)
+}
+
+// Prepare starts Collector and Queue.
+func (g *Gatherer) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", g.Name(), g.config.NameSpace())
+	if err := g.runningCollector.Prepare(); err != nil {
+		log.Logger.Infof("%s collector of %s namespace was failed to initialize", g.runningCollector.Name(), g.config.NameSpace())
+		return err
+	}
+
+	if err := g.runningQueue.Prepare(); err != nil {
+		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", g.runningQueue.Name(), g.config.NameSpace())
+		return err
+	}
+	return nil
+}
+
+// Boot fetches Collector input data and pushes it into Queue.
+func (g *Gatherer) Boot(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		for {
+			select {
+			case e := <-g.runningCollector.EventChannel():
+				err := g.runningQueue.Publisher().Enqueue(e)
+				if err != nil {
+					// todo add abandonedCount metrics
+					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", g.config.NameSpace(), err)
+				}
+			case <-ctx.Done():
+				g.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown close Collector and Queue.
+func (g *Gatherer) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", g.Name(), g.config.NameSpace())
+	if err := g.runningCollector.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s collector  in %s namespace, error is: %v", g.runningCollector.Name(), g.config.NameSpace(), err)
+	}
+	if err := g.runningQueue.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace, error is: %v", g.runningQueue.Name(), g.config.NameSpace(), err)
+	}
+}
+
+// Ack some events according to the startOffset and the BatchSize
+func (g *Gatherer) Ack(startOffset int64, batchSize int) {
+	<-g.runningQueue.Ack(startOffset, batchSize)
+}
diff --git a/internal/satellite/module/processor.go b/internal/satellite/module/processor.go
new file mode 100644
index 0000000..e1cdf96
--- /dev/null
+++ b/internal/satellite/module/processor.go
@@ -0,0 +1,115 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+// Processor is the processing module in Satellite.
+type Processor struct {
+	// config
+	config *config.ProcessorConfig
+
+	// dependency plugins
+	runningFilters []filter.Filter
+
+	// dependency modules
+	sender   *Sender
+	gatherer *Gatherer
+}
+
+func (p *Processor) Name() string {
+	return constant.ProcessorModule
+}
+
+func (p *Processor) Description() string {
+	return "processor module is the core processing module in Satellite"
+}
+
+func (p *Processor) Config() config.ModuleConfig {
+	return p.config
+}
+
+// Init Processor and dependency plugins
+func (p *Processor) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", p.Name(), p.config.NameSpace())
+	p.config = cfg.(*config.ProcessorConfig)
+	p.runningFilters = []filter.Filter{}
+	for _, c := range p.config.FilterConfig {
+		p.runningFilters = append(p.runningFilters, filter.GetFilter(c))
+	}
+}
+
+// Prepare inject the dependency modules.
+func (p *Processor) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", p.Name(), p.config.NameSpace())
+	p.sender = GetRunningModule(p.config.NameSpace(), constant.SenderModule).(*Sender)
+	p.gatherer = GetRunningModule(p.config.NameSpace(), constant.GathererModule).(*Gatherer)
+	return nil
+}
+
+// BOOT fetches the data of Queue, does a series of processing, and then sends to Sender.
+func (p *Processor) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", p.Name(), p.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		defer wg.Done()
+		for {
+			// fetch a new event from Queue of Gatherer
+			e, offset, err := p.gatherer.runningQueue.Consumer().Dequeue()
+			if err != nil {
+				// todo add metrics
+				log.Logger.Errorf("cannot get event from queue in %s namespace, error is: %v",
+					p.config.NameSpace(), err)
+				continue
+			}
+			c := &event.OutputEventContext{
+				Offset:  offset,
+				Context: make(map[string]event.Event),
+			}
+			// processing the event with filters, that put the necessary events to OutputEventContext.
+			c.Put(e)
+			for _, f := range p.runningFilters {
+				f.Process(c)
+			}
+
+			select {
+			// put result input the Input channel of Sender
+			case p.sender.Input <- c:
+			case <-ctx.Done():
+				p.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+func (p *Processor) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", p.Name(), p.config.NameSpace())
+}
diff --git a/internal/satellite/module/sender.go b/internal/satellite/module/sender.go
new file mode 100644
index 0000000..54ac3ac
--- /dev/null
+++ b/internal/satellite/module/sender.go
@@ -0,0 +1,181 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+	fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Sender is the forward module in Satellite.
+type Sender struct {
+	// config
+	config *config.SenderConfig
+
+	// dependency plugins
+	runningForwarders []forwarder.Forwarder
+	runningFallbacker fallbacker.Fallbacker
+
+	// dependency modules
+	gatherer      *Gatherer
+	clientManager *ClientManager
+
+	// self components
+	Input        chan *event.OutputEventContext // logic input channel
+	input        chan *event.OutputEventContext // physical input channel
+	listener     chan ClientStatus              // client status listener
+	flushChannel chan *buffer.BatchBuffer       // forwarder flush channel
+	buffer       *buffer.BatchBuffer            // cache the downstream input data
+}
+
+func (s *Sender) Name() string {
+	return constant.SenderModule
+}
+
+func (s *Sender) Description() string {
+	return "forward the input events to external services, such as Kafka and OAP"
+}
+
+func (s *Sender) Config() config.ModuleConfig {
+	return s.config
+}
+
+// Init Sender, dependency plugins and self components.
+func (s *Sender) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", s.Name(), s.config.NameSpace())
+	s.config = cfg.(*config.SenderConfig)
+	s.runningFallbacker = fallbacker.GetFallbacker(s.config.FallbackerConfig)
+	s.runningForwarders = []forwarder.Forwarder{}
+	for _, c := range s.config.ForwardersConfig {
+		s.runningForwarders = append(s.runningForwarders, forwarder.GetForwarder(c))
+	}
+	s.input = make(chan *event.OutputEventContext)
+	s.Input = s.input
+	s.listener = make(chan ClientStatus)
+	s.flushChannel = make(chan *buffer.BatchBuffer, 1)
+	s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+}
+
+// Prepare inject the dependency modules and register the client status listener to the clientManager.
+func (s *Sender) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", s.Name(), s.config.NameSpace())
+	s.clientManager = GetRunningModule(s.config.NameSpace(), constant.ClientManagerModule).(*ClientManager)
+	s.gatherer = GetRunningModule(s.config.NameSpace(), constant.GathererModule).(*Gatherer)
+	s.clientManager.RegisterListener(s.listener)
+	return nil
+}
+
+// Boot fetches the downstream input data and forward to external services, such as Kafka and OAP receiver.
+func (s *Sender) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", s.Name(), s.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(2)
+	// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+	// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+		for {
+			select {
+			case status := <-s.listener:
+				switch status {
+				case Connected:
+					log.Logger.Infof("%s module of %s namespace is notified the connection is connected", s.Name(), s.config.NameSpace())
+					s.Input = s.input
+				case Disconnect:
+					log.Logger.Infof("%s module of %s namespace is notified the connection is disconnected", s.Name(), s.config.NameSpace())
+					s.Input = nil
+				}
+			case <-timeTicker.C:
+				if s.buffer.Len() > s.config.MinFlushEvents {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case e := <-s.Input:
+				s.buffer.Add(e)
+				if s.buffer.Len() == s.config.MaxBufferSize {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case <-ctx.Done():
+				s.Input = nil
+				return
+			}
+		}
+	}()
+	// Keep fetching BatchBuffer to forward.
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case b := <-s.flushChannel:
+				s.consume(b)
+			case <-ctx.Done():
+				s.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown closes the channels and tries to force forward the events in the buffer.
+func (s *Sender) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", s.Name(), s.config.NameSpace())
+	close(s.input)
+	for buf := range s.flushChannel {
+		s.consume(buf)
+	}
+	s.consume(s.buffer)
+	close(s.flushChannel)
+}
+
+// consume would forward the events by type and ack this batch.
+func (s *Sender) consume(batch *buffer.BatchBuffer) {
+	log.Logger.Infof("%s module of %s namespace is flushing a new batch buffer. the start offset is %d, and the batch size is %d",
+		s.Name(), s.config.NameSpace(), batch.First(), batch.BatchSize())
+	var events = make(map[event.Type]event.BatchEvents)
+	for i := 0; i < batch.Len(); i++ {
+		eventContext := batch.Buf()[i]
+		for _, e := range eventContext.Context {
+			if e.IsRemote() {
+				events[e.Type()] = append(events[e.Type()], e)
+			}
+		}
+	}
+
+	for _, f := range s.runningForwarders {
+		for t, batchEvents := range events {
+			if f.ForwardType() == t {
+				s.runningFallbacker.FallBack(batchEvents, s.clientManager.GetConnectedClient(), f.Forward, func() {
+					s.clientManager.ReportError()
+				})
+			}
+		}
+	}
+	s.gatherer.Ack(batch.First(), batch.BatchSize())
+}
diff --git a/internal/pkg/plugin/define.go b/plugins/README.md
similarity index 81%
rename from internal/pkg/plugin/define.go
rename to plugins/README.md
index ad95698..c67bca2 100644
--- a/internal/pkg/plugin/define.go
+++ b/plugins/README.md
@@ -1,22 +1,4 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package plugin
-
+```
 // The following graph illustrates the relationship between different plugin interface in api package.
 //
 //
@@ -75,10 +57,4 @@ package plugin
 // There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
 // preparing phase, running phase, and closing phase. In the running phase, each plugin has
 // its own interface definition. However, the other three phases have to be defined uniformly.
-
-type Plugin interface {
-	// Description returns the description of the specific plugin.
-	Description() string
-	// Init initialize the specific plugin.
-	InitPlugin(config map[string]interface{})
-}
+```
\ No newline at end of file
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 51d2b65..33f61a6 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -27,21 +27,24 @@ import (
 type Client interface {
 	plugin.Plugin
 
-	// Prepare would make connection with outer service.
-	Prepare()
+	// Init would make connection with outer service.
+	Connect() error
+
+	// Return the status of the client.
+	IsConnected() bool
+
 	// GetConnection returns the connected client to publish events.
 	GetConnectedClient() interface{}
+
 	// Close the connection with outer service.
-	Close()
+	Close() error
 }
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
 // Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func GetClient(config plugin.DefaultConfig) Client {
+	return plugin.Get(reflect.TypeOf((*Client)(nil)).Elem(), config).(Client)
 }
 
 func init() {
-	plugin.AddPluginCategory(ClientCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/client/example/client.go b/plugins/client/example/client.go
deleted file mode 100644
index 77b29c7..0000000
--- a/plugins/client/example/client.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-type demoClient struct {
-	a string
-}
-type demoClient2 struct {
-	a string
-}
-type demoClient3 struct {
-	a string
-}
-
-func (d demoClient) Description() string {
-	panic("implement me")
-}
-
-func (d demoClient) InitPlugin(config map[string]interface{}) {
-
-}
-
-func (d demoClient) Prepare() {
-	panic("implement me")
-}
-
-func (d demoClient) GetConnectedClient() interface{} {
-	panic("implement me")
-}
-
-func (d demoClient) Close() {
-	panic("implement me")
-}
-
-func (d *demoClient2) Description() string {
-	panic("implement me")
-}
-
-func (d *demoClient2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoClient2) Prepare() {
-	panic("implement me")
-}
-
-func (d *demoClient2) GetConnectedClient() interface{} {
-	panic("implement me")
-}
-
-func (d *demoClient2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
index 374d63a..a756974 100644
--- a/plugins/collector/api/collector.go
+++ b/plugins/collector/api/collector.go
@@ -27,7 +27,7 @@ import (
 //   Init()     Initial stage: Init plugin by config
 //    ||
 //    \/
-//   Prepare()   Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
+//   Init()   Preparing stage: Init the collector, such as build connection with SkyWalking javaagent.
 //    ||
 //    \/
 //   Next()     Running stage: When Collector collect a data, the data would be fetched by the upstream
@@ -38,22 +38,21 @@ import (
 // Collector is a plugin interface, that defines new collectors.
 type Collector interface {
 	plugin.Plugin
-
-	// Prepare creates a listen or reader to gather data.
-	Prepare()
+	// Prepare creates a listener or reader to gather APM data.
+	Prepare() error
 	// Next return the data from the input.
-	Next() (event.SerializableEvent, error)
+	EventChannel() <-chan event.SerializableEvent
 	// Close would close collector.
-	Close()
+	Close() error
 }
 
 var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
 
 // Get collector plugin.
-func GetCollector(pluginName string, config map[string]interface{}) Collector {
-	return plugin.Get(CollectorCategory, pluginName, config).(Collector)
+func GetCollector(config plugin.DefaultConfig) Collector {
+	return plugin.Get(CollectorCategory, config).(Collector)
 }
 
 func init() {
-	plugin.AddPluginCategory(CollectorCategory)
+	plugin.RegisterPluginCategory(CollectorCategory, nil, nil, nil)
 }
diff --git a/plugins/collector/example/collector.go b/plugins/collector/example/collector.go
deleted file mode 100644
index 40b9b8b..0000000
--- a/plugins/collector/example/collector.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoCollector struct {
-	a string
-}
-
-type demoCollector2 struct {
-	a string
-}
-
-type demoCollector3 struct {
-	a string
-}
-
-func (d *demoCollector) Description() string {
-	panic("implement me")
-}
-
-func (d *demoCollector) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoCollector) Prepare() {
-	panic("implement me")
-}
-
-func (d *demoCollector) Next() (event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d *demoCollector) Close() {
-	panic("implement me")
-}
-
-func (d demoCollector2) Description() string {
-	panic("implement me")
-}
-
-func (d demoCollector2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoCollector2) Prepare() {
-	panic("implement me")
-}
-
-func (d demoCollector2) Next() (event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoCollector2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/collector/example/collector_test.go b/plugins/collector/example/collector_test.go
deleted file mode 100644
index e73f3bd..0000000
--- a/plugins/collector/example/collector_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/collector/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoCollector",
-			args: &demoCollector{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoCollector2",
-			args: demoCollector2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoCollector3",
-			args: demoCollector3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetCollector(name, config)
-}
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index bde56ef..91dea45 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -22,23 +22,24 @@ import (
 
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
 )
 
 // Fallbacker is a plugin interface, that defines some fallback strategies.
 type Fallbacker interface {
 	plugin.Plugin
-
 	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents) Fallbacker
+	FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc, callback DisconnectionCallback) Fallbacker
 }
 
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
+type DisconnectionCallback func()
 
 // Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
-	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+func GetFallbacker(config plugin.DefaultConfig) Fallbacker {
+	return plugin.Get(reflect.TypeOf((*Fallbacker)(nil)).Elem(), config).(Fallbacker)
 }
 
+// init register the Fallbacker interface
 func init() {
-	plugin.AddPluginCategory(FallbackerCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/fallbacker/example/fallbacker.go b/plugins/fallbacker/example/fallbacker.go
deleted file mode 100644
index b933e96..0000000
--- a/plugins/fallbacker/example/fallbacker.go
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"github.com/apache/skywalking-satellite/internal/pkg/event"
-	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-type demoFallbacker struct {
-	a string
-}
-
-type demoFallbacker2 struct {
-	a string
-}
-
-type demoFallbacker3 struct {
-	a string
-}
-
-func (d *demoFallbacker) Description() string {
-	panic("implement me")
-}
-
-func (d *demoFallbacker) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFallbacker) FallBack(batch event.BatchEvents) api.Fallbacker {
-	panic("implement me")
-}
-
-func (d demoFallbacker2) Description() string {
-	panic("implement me")
-}
-
-func (d demoFallbacker2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoFallbacker2) FallBack(batch event.BatchEvents) api.Fallbacker {
-	panic("implement me")
-}
diff --git a/plugins/fallbacker/example/fallbacker_test.go b/plugins/fallbacker/example/fallbacker_test.go
deleted file mode 100644
index f5f445f..0000000
--- a/plugins/fallbacker/example/fallbacker_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoFallbacker",
-			args: &demoFallbacker{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFallbacker2",
-			args: demoFallbacker2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFallbacker3",
-			args: demoFallbacker3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetFallbacker(name, config)
-}
diff --git a/plugins/fallbacker/timer/timer_fallbacker.go b/plugins/fallbacker/timer/timer_fallbacker.go
new file mode 100644
index 0000000..b86981e
--- /dev/null
+++ b/plugins/fallbacker/timer/timer_fallbacker.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package timer
+
+import (
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Fallbacker is a timer fallbacker when forward fails. `latencyFactor` is the standard retry duration,
+// and the time for each retry is expanded by 2 times until the number of retries reaches the maximum.
+type Fallbacker struct {
+	maxTimes      int `mapstructure:"max_times"`
+	latencyFactor int `mapstructure:"latency_factor"`
+}
+
+func (t *Fallbacker) Name() string {
+	return "timer-fallbacker"
+}
+
+func (t *Fallbacker) Description() string {
+	return "this is a timer trigger when forward fails."
+}
+
+func (t *Fallbacker) DefaultConfig() string {
+	return `
+max_times: 3
+latency_factor: 2000
+`
+}
+
+func (t *Fallbacker) FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc) {
+	if err := forward(connection, batch); err != nil {
+		count := 1
+		currentLatency := count * t.latencyFactor
+		for count < t.maxTimes {
+			time.Sleep(time.Duration(currentLatency) * time.Millisecond)
+			if err := forward(connection, batch); err != nil {
+				currentLatency *= 2
+			} else {
+				break
+			}
+		}
+	}
+}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index 0ed43db..bc8e9bb 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -34,17 +34,17 @@ import (
 type Filter interface {
 	plugin.Plugin
 
-	// Process produces a new event by processing incoming event.
-	Process(in event.Event) event.Event
+	// Process would fetch the needed event
+	Process(context *event.OutputEventContext)
 }
 
 var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
 
 // Get filter plugin.
-func GetFilter(pluginName string, config map[string]interface{}) Filter {
-	return plugin.Get(FilterCategory, pluginName, config).(Filter)
+func GetFilter(config plugin.DefaultConfig) Filter {
+	return plugin.Get(FilterCategory, config).(Filter)
 }
 
 func init() {
-	plugin.AddPluginCategory(FilterCategory)
+	plugin.RegisterPluginCategory(FilterCategory, nil, nil, nil)
 }
diff --git a/plugins/filter/example/filter_test.go b/plugins/filter/example/filter_test.go
deleted file mode 100644
index aae4371..0000000
--- a/plugins/filter/example/filter_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/filter/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoFilter",
-			args: &demoFilter{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFilter2",
-			args: demoFilter2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFilter3",
-			args: demoFilter3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetFilter(name, config)
-}
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index 9e99f14..b0509cd 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -24,34 +24,25 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()     Initiating stage: Init plugin by config
-//    ||
-//    \/
-//   Prepare()   Preparing stage: Prepare the Forwarder, such as get remote client.
-//    ||
-//    \/
-//   Forward()  Running stage: Forward the batch events
-//    ||
-//    \/
-//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
 // Forwarder is a plugin interface, that defines new forwarders.
 type Forwarder interface {
 	plugin.Plugin
 
 	// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
-	Forward(batch event.BatchEvents)
-
-	// ForwardType returns the supporting event type that could be forwarded.
+	Forward(connection interface{}, batch event.BatchEvents) error
+	// ForwardType returns the supported event type.
 	ForwardType() event.Type
 }
 
-var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
+// ForwardFunc represent the Forward() in Forwarder
+type ForwardFunc func(connection interface{}, batch event.BatchEvents) error
 
-func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
-	return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
+// GetForwarder returns an initialized forwarder plugin.
+func GetForwarder(config map[string]interface{}) Forwarder {
+	return plugin.Get(reflect.TypeOf((*Forwarder)(nil)).Elem(), config).(Forwarder)
 }
 
+// init register the Forwarder interface
 func init() {
-	plugin.AddPluginCategory(ForwarderCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/forwarder/example/forwarder.go b/plugins/forwarder/example/forwarder.go
deleted file mode 100644
index 637f9e1..0000000
--- a/plugins/forwarder/example/forwarder.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoForwarder struct {
-	a string
-}
-
-type demoForwarder2 struct {
-	a string
-}
-
-type demoForwarder3 struct {
-	a string
-}
-
-func (d *demoForwarder) Description() string {
-	panic("implement me")
-}
-
-func (d *demoForwarder) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoForwarder) Forward(batch event.BatchEvents) {
-	panic("implement me")
-}
-
-func (d demoForwarder2) Description() string {
-	panic("implement me")
-}
-
-func (d demoForwarder2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoForwarder2) Forward(batch event.BatchEvents) {
-	panic("implement me")
-}
-
-func (d demoForwarder2) ForwardType() event.Type {
-	panic("implement me")
-}
-
-func (d *demoForwarder) ForwardType() event.Type {
-	panic("implement me")
-}
diff --git a/plugins/forwarder/example/forwarder_test.go b/plugins/forwarder/example/forwarder_test.go
deleted file mode 100644
index 28de35d..0000000
--- a/plugins/forwarder/example/forwarder_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoForwarder",
-			args: &demoForwarder{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoForwarder2",
-			args: demoForwarder2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoForwarder3",
-			args: demoForwarder3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetForwarder(name, config)
-}
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 3ca5ebe..cba3bca 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -38,12 +38,10 @@ type Parser interface {
 	ParseStr(str string) ([]event.SerializableEvent, error)
 }
 
-var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
-
-func GetParser(pluginName string, config map[string]interface{}) Parser {
-	return plugin.Get(ParserCategory, pluginName, config).(Parser)
+func GetParser(pluginName string, config plugin.DefaultConfig) Parser {
+	return plugin.Get(reflect.TypeOf((*Parser)(nil)).Elem(), config).(Parser)
 }
 
 func init() {
-	plugin.AddPluginCategory(ParserCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/parser/example/parser.go b/plugins/parser/example/parser.go
deleted file mode 100644
index 839956f..0000000
--- a/plugins/parser/example/parser.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoParser struct {
-	a string
-}
-
-type demoParser2 struct {
-	a string
-}
-
-type demoParser3 struct {
-	a string
-}
-
-func (d *demoParser) Description() string {
-	panic("implement me")
-}
-
-func (d *demoParser) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoParser) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d *demoParser) ParseStr(str string) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoParser2) Description() string {
-	panic("implement me")
-}
-
-func (d demoParser2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoParser2) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoParser2) ParseStr(str string) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
diff --git a/plugins/parser/example/parser_test.go b/plugins/parser/example/parser_test.go
deleted file mode 100644
index b577f7d..0000000
--- a/plugins/parser/example/parser_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/parser/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoParser",
-			args: &demoParser{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoParser2",
-			args: demoParser2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoParser3",
-			args: demoParser3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetParser(name, config)
-}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index 9b95156..f6d4d69 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -28,6 +28,9 @@ import (
 type Queue interface {
 	plugin.Plugin
 
+	// Prepare creates the queue.
+	Prepare() error
+
 	// Publisher get the only publisher for the current queue.
 	Publisher() QueuePublisher
 
@@ -35,27 +38,30 @@ type Queue interface {
 	Consumer() QueueConsumer
 
 	// Close would close the queue.
-	Close()
+	Close() error
+
+	// Ack a batch
+	Ack(startOffset int64, batchSize int) chan struct{}
 }
 
 // QueuePublisher is a plugin interface, that defines new queue publishers.
 type QueuePublisher interface {
 	// Enqueue push a inputEvent into the queue.
-	Enqueue(event *event.SerializableEvent) error
+	Enqueue(event event.SerializableEvent) error
 }
 
 // QueueConsumer is a plugin interface, that defines new queue consumers.
 type QueueConsumer interface {
 	// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
-	Dequeue() (event *event.SerializableEvent, offset int64, err error)
+	Dequeue() (event event.SerializableEvent, offset int64, err error)
 }
 
 var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
 
-func GetQueue(pluginName string, config map[string]interface{}) Queue {
-	return plugin.Get(QueueCategory, pluginName, config).(Queue)
+func GetQueue(config plugin.DefaultConfig) Queue {
+	return plugin.Get(QueueCategory, config).(Queue)
 }
 
 func init() {
-	plugin.AddPluginCategory(QueueCategory)
+	plugin.RegisterPluginCategory(QueueCategory, nil, nil, nil)
 }
diff --git a/plugins/queue/example/queue.go b/plugins/queue/example/queue.go
deleted file mode 100644
index 5a3055a..0000000
--- a/plugins/queue/example/queue.go
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-type demoQueue struct {
-	a string
-}
-
-type demoQueue2 struct {
-	a string
-}
-
-type demoQueue3 struct {
-	a string
-}
-
-func (d *demoQueue) Description() string {
-	panic("implement me")
-}
-
-func (d *demoQueue) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoQueue) Publisher() api.QueuePublisher {
-	panic("implement me")
-}
-
-func (d *demoQueue) Consumer() api.QueueConsumer {
-	panic("implement me")
-}
-
-func (d *demoQueue) Close() {
-	panic("implement me")
-}
-
-func (d demoQueue2) Description() string {
-	panic("implement me")
-}
-
-func (d demoQueue2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoQueue2) Publisher() api.QueuePublisher {
-	panic("implement me")
-}
-
-func (d demoQueue2) Consumer() api.QueueConsumer {
-	panic("implement me")
-}
-
-func (d demoQueue2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/queue/example/queue_test.go b/plugins/queue/example/queue_test.go
deleted file mode 100644
index 0afd20d..0000000
--- a/plugins/queue/example/queue_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoQueue",
-			args: &demoQueue{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoQueue2",
-			args: demoQueue2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoQueue3",
-			args: demoQueue3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetQueue(name, config)
-}