You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/08 11:13:53 UTC

[skywalking-satellite] 01/01: add satellite main structure

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch boot-framework
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit c0939b1f361f071c4fe4ba668dfca0264397acfc
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Dec 8 19:13:07 2020 +0800

    add satellite main structure
---
 main.go => cmd/command.go                          |  33 ++-
 main.go => cmd/main.go                             |  22 +-
 configs/{config.yaml => satellite_config.yaml}     |  61 ++--
 dist/LICENSE                                       |   4 +-
 dist/licenses/LICENSE-cli                          |  21 ++
 dist/licenses/LICENSE-viper                        |  21 ++
 go.mod                                             |   6 +-
 go.sum                                             | 306 +++++++++++++++++++++
 main.go => internal/pkg/constant/constant.go       |  15 +-
 internal/pkg/event/event.go                        |   8 +-
 internal/pkg/{logger => log}/log.go                |  92 +++++--
 internal/pkg/{logger => log}/log_test.go           |  44 ++-
 internal/pkg/plugin/default_plugin.go              |  76 +++++
 main.go => internal/pkg/plugin/definition.go       |  19 +-
 internal/pkg/plugin/plugin_test.go                 | 101 +++++++
 internal/pkg/plugin/registry.go                    |  65 +++--
 main.go => internal/pkg/test/path.go               |  18 +-
 internal/satellite/boot/boot.go                    | 105 +++++++
 .../satellite/config/clientmanager_config.go       |  34 +--
 .../satellite/config/gatherer_config.go            |  44 +--
 internal/satellite/config/loader.go                |  68 +++++
 internal/satellite/config/loader_test.go           | 133 +++++++++
 .../satellite/config/processor_config.go           |  23 +-
 internal/satellite/config/satellite_config.go      |  51 ++++
 .../satellite/config/sender_config.go              |  36 ++-
 internal/satellite/module/buffer/buffer.go         |  94 +++++++
 .../satellite/module/buffer/buffer_test.go         |  65 ++---
 internal/satellite/module/clientmanager.go         | 182 ++++++++++++
 internal/satellite/module/definition.go            |  98 +++++++
 internal/satellite/module/gatherer.go              | 112 ++++++++
 internal/satellite/module/processor.go             | 115 ++++++++
 internal/satellite/module/sender.go                | 181 ++++++++++++
 internal/pkg/plugin/define.go => plugins/README.md |  28 +-
 plugins/client/api/client.go                       |  19 +-
 plugins/client/example/client.go                   |  67 -----
 plugins/collector/api/collector.go                 |  17 +-
 plugins/collector/example/collector.go             |  70 -----
 plugins/collector/example/collector_test.go        |  71 -----
 plugins/fallbacker/api/fallbacker.go               |  13 +-
 plugins/fallbacker/example/fallbacker.go           |  57 ----
 plugins/fallbacker/example/fallbacker_test.go      |  71 -----
 plugins/fallbacker/timer/timer_fallbacker.go       |  62 +++++
 plugins/filter/api/filter.go                       |  10 +-
 plugins/filter/example/filter_test.go              |  71 -----
 plugins/forwarder/api/forwarder.go                 |  27 +-
 plugins/forwarder/example/forwarder.go             |  62 -----
 plugins/forwarder/example/forwarder_test.go        |  71 -----
 plugins/parser/api/parser.go                       |   8 +-
 plugins/parser/example/parser.go                   |  62 -----
 plugins/parser/example/parser_test.go              |  71 -----
 plugins/queue/api/queue.go                         |  18 +-
 plugins/queue/example/queue.go                     |  72 -----
 plugins/queue/example/queue_test.go                |  71 -----
 53 files changed, 2171 insertions(+), 1100 deletions(-)

diff --git a/main.go b/cmd/command.go
similarity index 54%
copy from main.go
copy to cmd/command.go
index 94fbc40..0457071 100644
--- a/main.go
+++ b/cmd/command.go
@@ -17,6 +17,35 @@
 
 package main
 
-func main() {
-	print("OK")
+import (
+	"github.com/urfave/cli/v2"
+
+	"github.com/apache/skywalking-satellite/internal/satellite/boot"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+var (
+	cmdStart = cli.Command{
+		Name:  "start",
+		Usage: "start satellite",
+		Flags: []cli.Flag{
+			&cli.StringFlag{
+				Name:    "config, c",
+				Usage:   "Load configuration from `FILE`",
+				EnvVars: []string{"MOSN_CONFIG"},
+				Value:   "configs/satellite_config.yaml",
+			},
+		},
+		Action: func(c *cli.Context) error {
+
+			cfg := loadConfig(c)
+			return boot.Start(cfg)
+		},
+	}
+)
+
+func loadConfig(c *cli.Context) *config.SatelliteConfig {
+	configPath := c.String("config")
+	cfg := config.Load(configPath)
+	return cfg
 }
diff --git a/main.go b/cmd/main.go
similarity index 61%
copy from main.go
copy to cmd/main.go
index 94fbc40..8c2d809 100644
--- a/main.go
+++ b/cmd/main.go
@@ -17,6 +17,26 @@
 
 package main
 
+import (
+	"os"
+	"time"
+
+	"github.com/urfave/cli/v2"
+)
+
+// Version will be initialized when building
+var Version string = "lastest"
+
 func main() {
-	print("OK")
+	app := cli.NewApp()
+	app.Name = "SkyWalking-Satellite"
+	app.Version = Version
+	app.Compiled = time.Now()
+	app.Usage = "Satellite is for collecting APM data."
+	app.Description = "A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs."
+	app.Commands = []*cli.Command{
+		&cmdStart,
+	}
+	app.Action = cli.ShowAppHelp
+	_ = app.Run(os.Args)
 }
diff --git a/configs/config.yaml b/configs/satellite_config.yaml
similarity index 54%
rename from configs/config.yaml
rename to configs/satellite_config.yaml
index 232aecb..68caab2 100644
--- a/configs/config.yaml
+++ b/configs/satellite_config.yaml
@@ -15,34 +15,35 @@
 # limitations under the License.
 #
 
-Gatherer:
-  - name: segment-receiver
-    type: segment-receiver
-    config:
-      key: value
-      key2: value2
-    queue:
-      type: mmap-queue
-      config:
+logger:
+  log_pattern: "%time [%level][%field] - %msg"
+  time_pattern: "2006-01-02 15:04:05.001"
+  level: "info"
+
+agents:
+  - common_config:
+      namespace: namespace1
+    gatherer:
+      collector:
+        plugin_name: segment-receiver
+        k: v
+      queue:
+        plugin_name: mmap-queue
         key: value
-        key2: value2
-    processor: processor1
-Processor:
-  - name: processor1
-    filters:
-      - filtername1
-      - filtername2
-      - filtername3
-Sender:
-  client:
-    name: gRPC-client
-    type: gRPC
-    config:
-      key1: value1
-      key2: value2
-  forwarders:
-    - type: segment-forwarder
-      eventType: segment
-      config:
-        key1: value1
-        key2: value2
\ No newline at end of file
+    processor:
+      filters:
+        - plugin_name: filtertype1
+          key: value
+    sender:
+      flush_time: 5000
+      max_buffer_size: 100
+      forwarders:
+        - plugin_name: segment-forwarder
+          key: value
+  - common_config:
+      namespace: sharing
+    client_manager:
+      retry_interval: 5000
+      client:
+        plugin_name: "grpc"
+        k: v
diff --git a/dist/LICENSE b/dist/LICENSE
index 2ab0d2f..618fe55 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -231,4 +231,6 @@ MIT licenses
 The following components are provided under the MIT License. See project link for details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
-	sirupsen (logrus) 1.7.0: https://github.com/sirupsen/logrus MIT
+	sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
+	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
+	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
diff --git a/dist/licenses/LICENSE-cli b/dist/licenses/LICENSE-cli
new file mode 100644
index 0000000..17356d3
--- /dev/null
+++ b/dist/licenses/LICENSE-cli
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Jeremy Saenz & Contributors
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/dist/licenses/LICENSE-viper b/dist/licenses/LICENSE-viper
new file mode 100644
index 0000000..4527efb
--- /dev/null
+++ b/dist/licenses/LICENSE-viper
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Steve Francia
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 71fd69d..438ceff 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,8 @@ module github.com/apache/skywalking-satellite
 
 go 1.14
 
-require github.com/sirupsen/logrus v1.7.0
+require (
+	github.com/sirupsen/logrus v1.7.0
+	github.com/spf13/viper v1.7.1
+	github.com/urfave/cli/v2 v2.3.0
+)
diff --git a/go.sum b/go.sum
index 4d74a1e..89357d1 100644
--- a/go.sum
+++ b/go.sum
@@ -1,10 +1,316 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
+cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
+cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
+cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
+cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
+cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
+cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
+cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
+dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
+github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
+github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
+github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
+github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
+github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
+github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
+github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
+github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
+github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
+github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
+github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
+github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
+github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
+github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
+github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
+github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
+github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
+github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
+github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
+github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
+github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
+github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
+github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
+github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
+github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
+github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
+github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
+github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
+github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
+github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
+github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
+github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
+github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
+github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
+github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
+github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
+github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
+github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
+go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
+golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
+golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
+golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
+golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
+golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
+golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
+google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
+google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
+gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
+rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/main.go b/internal/pkg/constant/constant.go
similarity index 77%
copy from main.go
copy to internal/pkg/constant/constant.go
index 94fbc40..f1ce73b 100644
--- a/main.go
+++ b/internal/pkg/constant/constant.go
@@ -15,8 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package constant
 
-func main() {
-	print("OK")
-}
+const (
+	// project name
+	ProjectName = "skywalking-satellite"
+
+	// Module names
+	ClientManagerModule = "client-manager"
+	GathererModule      = "gatherer"
+	ProcessorModule     = "processor"
+	SenderModule        = "sender"
+)
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index c8ff0f2..20c6187 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -73,20 +73,18 @@ type BatchEvents []Event
 
 // OutputEventContext is a container to store the output context.
 type OutputEventContext struct {
-	context map[string]Event
+	Context map[string]Event
 	Offset  int64
 }
 
 // Put puts the incoming event into the context when the event is a remote event.
 func (c *OutputEventContext) Put(event Event) {
-	if event.IsRemote() {
-		c.context[event.Name()] = event
-	}
+	c.Context[event.Name()] = event
 }
 
 // Get returns a event in the context. When the eventName does not exist, a error would be returned.
 func (c *OutputEventContext) Get(eventName string) (Event, error) {
-	e, ok := c.context[eventName]
+	e, ok := c.Context[eventName]
 	if !ok {
 		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
 		return nil, err
diff --git a/internal/pkg/logger/log.go b/internal/pkg/log/log.go
similarity index 51%
rename from internal/pkg/logger/log.go
rename to internal/pkg/log/log.go
index a62128d..9918dd4 100644
--- a/internal/pkg/logger/log.go
+++ b/internal/pkg/log/log.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package log
 
 import (
 	"fmt"
@@ -32,57 +32,97 @@ const (
 	defaultTimePattern = "2006-01-02 15:04:05.001"
 )
 
+// LoggerConfig initializes the global logger config.
+type LoggerConfig struct {
+	LogPattern  string `mapstructure:"log_pattern"`
+	TimePattern string `mapstructure:"time_pattern"`
+	Level       string `mapstructure:"level"`
+}
+
+// FormatOption is a function to set formatter config.
+type FormatOption func(f *formatter)
+
+// FormatOption is a function to set logger config.
+type ConfigOption func(l *logrus.Logger)
+
 type formatter struct {
 	logPattern  string
 	timePattern string
 }
 
-// Option is a function to set formatter config.
-type Option func(f *formatter)
-
-// Log is the global logger.
-var Log *logrus.Logger
+// Logger is the global logger.
+var Logger *logrus.Logger
 var once sync.Once
 
-// The Log init method, keep Log as a singleton.
-func Init(opts ...Option) {
+func Init(cfg *LoggerConfig) {
 	once.Do(func() {
-		if Log == nil {
-			Log = logrus.New()
-		}
-		Log.SetOutput(os.Stdout)
-		Log.SetLevel(logrus.InfoLevel)
-		f := &formatter{}
-		for _, opt := range opts {
-			opt(f)
-		}
-		if f.logPattern == "" {
-			f.logPattern = defaultLogPattern
+		var configOpts []ConfigOption
+		var formatOpts []FormatOption
+
+		if cfg.Level != "" {
+			configOpts = append(configOpts, SetLevel(cfg.Level))
+		} else {
+			configOpts = append(configOpts, SetLevel(logrus.InfoLevel.String()))
 		}
-		if f.timePattern == "" {
-			f.timePattern = defaultTimePattern
+		if cfg.TimePattern != "" {
+			formatOpts = append(formatOpts, SetTimePattern(cfg.TimePattern))
+		} else {
+			formatOpts = append(formatOpts, SetTimePattern(defaultTimePattern))
 		}
-		if !strings.Contains(f.logPattern, "\n") {
-			f.logPattern += "\n"
+		if cfg.LogPattern != "" {
+			formatOpts = append(formatOpts, SetLogPattern(cfg.LogPattern))
+		} else {
+			formatOpts = append(formatOpts, SetLogPattern(defaultLogPattern))
 		}
-		Log.SetFormatter(f)
+		initBySettings(configOpts, formatOpts)
 	})
 }
 
+// The Logger init method, keep Logger as a singleton.
+func initBySettings(configOpts []ConfigOption, formatOpts []FormatOption) {
+	// Default Logger.
+	Logger = logrus.New()
+	Logger.SetOutput(os.Stdout)
+	for _, opt := range configOpts {
+		opt(Logger)
+	}
+	// Default formatter.
+	f := &formatter{}
+	for _, opt := range formatOpts {
+		opt(f)
+	}
+	if !strings.Contains(f.logPattern, "\n") {
+		f.logPattern += "\n"
+	}
+	Logger.SetFormatter(f)
+}
+
 // Set the log pattern in formatter.
-func SetLogPattern(logPattern string) Option {
+func SetLogPattern(logPattern string) FormatOption {
 	return func(f *formatter) {
 		f.logPattern = logPattern
 	}
 }
 
 // Set the time pattern in formatter.
-func SetTimePattern(timePattern string) Option {
+func SetTimePattern(timePattern string) FormatOption {
 	return func(f *formatter) {
 		f.timePattern = timePattern
 	}
 }
 
+// Set the time pattern in formatter.
+func SetLevel(levelStr string) ConfigOption {
+	return func(logger *logrus.Logger) {
+		level, err := logrus.ParseLevel(levelStr)
+		if err != nil {
+			fmt.Printf("logger level does not exist: %s, level would be set info", levelStr)
+			level = logrus.InfoLevel
+		}
+		logger.SetLevel(level)
+	}
+}
+
 // Format supports unified log output format that has %time, %level, %field and %msg.
 func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
 	output := f.logPattern
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/log/log_test.go
similarity index 66%
rename from internal/pkg/logger/log_test.go
rename to internal/pkg/log/log_test.go
index d8ad895..2c690d8 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/log/log_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package log
 
 import (
 	"reflect"
@@ -26,7 +26,13 @@ import (
 )
 
 func TestFormatter_Format(t *testing.T) {
-	Init(SetLogPattern("[%time][%level][%field] - %msg"), SetTimePattern("2006-01-02 15:04:05,001"))
+	initBySettings([]ConfigOption{
+		SetLevel("warn"),
+	},
+		[]FormatOption{
+			SetLogPattern("[%time][%level][%field] - %msg"),
+			SetTimePattern("2006-01-02 15:04:05,001"),
+		})
 	type args struct {
 		entry *logrus.Entry
 	}
@@ -41,7 +47,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.TraceLevel
 					entry.Message = "entry1"
 					return entry
@@ -53,7 +59,7 @@ func TestFormatter_Format(t *testing.T) {
 			want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
 			args: args{
 				entry: func() *logrus.Entry {
-					entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+					entry := Logger.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
 					entry.Level = logrus.WarnLevel
 					entry.Message = "entry2"
 					return entry
@@ -64,7 +70,7 @@ func TestFormatter_Format(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			f := Log.Formatter
+			f := Logger.Formatter
 			got, _ := f.Format(tt.args.entry)
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("Format() got = %s, want %s", got, tt.want)
@@ -72,3 +78,31 @@ func TestFormatter_Format(t *testing.T) {
 		})
 	}
 }
+
+func TestSetLevel(t *testing.T) {
+	type args struct {
+		opts ConfigOption
+	}
+	tests := []struct {
+		name string
+		args args
+		want logrus.Level
+	}{
+		{
+			name: "info",
+			args: args{
+				opts: SetLevel("warn"),
+			},
+			want: logrus.WarnLevel,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			logger := logrus.New()
+			tt.args.opts(logger)
+			if logger.Level != tt.want {
+				t.Errorf("SetLevel() got = %s, want %s", logger.Level, tt.want)
+			}
+		})
+	}
+}
diff --git a/internal/pkg/plugin/default_plugin.go b/internal/pkg/plugin/default_plugin.go
new file mode 100644
index 0000000..cab5adc
--- /dev/null
+++ b/internal/pkg/plugin/default_plugin.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"fmt"
+	"strings"
+
+	"github.com/spf13/viper"
+)
+
+// DefaultPluginNameField is a required field in DefaultConfig.
+const DefaultPluginNameField = "plugin_name"
+
+// DefaultInitializingPlugin defines the plugins initialized by defaultInitializing.
+type DefaultInitializingPlugin interface {
+	Plugin
+	// DefaultConfig returns the default config, that is a YAML pattern.
+	DefaultConfig() string
+}
+
+// DefaultConfig is used to initialize the DefaultInitializingPlugin.
+type DefaultConfig map[string]interface{}
+
+// defaultNameFinder is used to get the plugin name in DefaultConfig.
+func defaultNameFinder(cfg interface{}) string {
+	c, ok := cfg.(DefaultConfig)
+	if !ok {
+		panic(fmt.Errorf("defaultNameFinder only supports DefaultConfig"))
+	}
+	name, ok := c[DefaultPluginNameField]
+	if !ok {
+		panic(fmt.Errorf("%s is requeired in DefaultConfig", DefaultPluginNameField))
+	}
+	return name.(string)
+}
+
+// defaultInitializing initialize the fields by fields mapping.
+func defaultInitializing(plugin Plugin, cfg interface{}) {
+	c, ok := cfg.(DefaultConfig)
+	if !ok {
+		panic(fmt.Errorf("%s plugin is a DefaultInitializingPlugin, but the type of configuration is illegal", plugin.Name()))
+	}
+	v := viper.New()
+	v.SetConfigType("yaml")
+	p := plugin.(DefaultInitializingPlugin)
+	if p.DefaultConfig() != "" {
+		if err := v.ReadConfig(strings.NewReader(p.DefaultConfig())); err != nil {
+			panic(fmt.Errorf("cannot read default config in the plugin: %s, the error is %v", plugin.Name(), err))
+		}
+	}
+	if err := v.MergeConfigMap(c); err != nil {
+		panic(fmt.Errorf("%s plugin cannot merge the custom configuration, the error is %v", plugin.Name(), err))
+	}
+	if err := v.Unmarshal(plugin); err != nil {
+		panic(fmt.Errorf("cannot inject  the config to the %s plugin, the error is %v", plugin.Name(), err))
+	}
+}
+
+// defaultCallBack does nothing.
+func defaultCallBack(plugin Plugin) {}
diff --git a/main.go b/internal/pkg/plugin/definition.go
similarity index 58%
copy from main.go
copy to internal/pkg/plugin/definition.go
index 94fbc40..bf73689 100644
--- a/main.go
+++ b/internal/pkg/plugin/definition.go
@@ -15,8 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package plugin
 
-func main() {
-	print("OK")
+// Plugin defines the plugin model in Satellite.
+type Plugin interface {
+	// Name returns the name of the specific plugin.
+	Name() string
+	// Description returns the description of the specific plugin.
+	Description() string
 }
+
+// NameFinderFunc is used to get the plugin name from different plugin configs.
+type NameFinderFunc func(config interface{}) string
+
+// InitializingFunc is used to initialize the specific plugin.
+type InitializingFunc func(plugin Plugin, config interface{})
+
+// CallbackFunc would be invoked after initializing.
+type CallbackFunc func(plugin Plugin)
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
new file mode 100644
index 0000000..a14eb3b
--- /dev/null
+++ b/internal/pkg/plugin/plugin_test.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+	"reflect"
+	"testing"
+)
+
+type DemoCategory interface {
+	DefaultInitializingPlugin
+	Say() string
+}
+
+type DemoPlugin struct {
+	Organization string `mapstructure:"organization"`
+	Project      string `mapstructure:"project"`
+}
+
+func (d *DemoPlugin) Say() string {
+	return d.Organization + ":" + d.Project
+}
+
+func (d *DemoPlugin) Name() string {
+	return "demoplugin"
+}
+
+func (d *DemoPlugin) Description() string {
+	return "this is just a demo"
+}
+
+func (d *DemoPlugin) DefaultConfig() string {
+	return `
+organization: "ASF"
+project: "skywalking-satellite"
+`
+}
+
+func TestPlugin(t *testing.T) {
+	tests := []struct {
+		name string
+		args DefaultConfig
+		want *DemoPlugin
+	}{
+		{
+			name: "test1",
+			args: DefaultConfig{
+				"plugin_name":  "demoplugin",
+				"organization": "CNCF",
+				"project":      "Fluentd",
+			},
+			want: &DemoPlugin{
+				Organization: "CNCF",
+				Project:      "Fluentd",
+			},
+		},
+		{
+			name: "demoplugin",
+			args: DefaultConfig{
+				"plugin_name": "demoplugin",
+			},
+			want: &DemoPlugin{
+				Organization: "ASF",
+				Project:      "skywalking-satellite",
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			defer func() {
+				if i := recover(); i != nil {
+					t.Errorf("the plugin %s is not exist", "demoplugin")
+				}
+			}()
+			plugin := Get(reflect.TypeOf((*DemoCategory)(nil)).Elem(), tt.args)
+			if !reflect.DeepEqual(plugin, tt.want) {
+				t.Errorf("Format() got = %v, want %v", plugin, tt.want)
+			}
+		})
+	}
+}
+
+func init() {
+	RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem(), nil, nil, nil)
+	RegisterPlugin(&DemoPlugin{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 4731d36..9059d66 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -23,57 +23,86 @@ import (
 	"sync"
 )
 
-// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
-// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
-// used as key in pluginRegistry.
-
-// reg is the global plugin registry
+// the global plugin registry
 var (
-	reg  map[reflect.Type]map[string]reflect.Value
-	lock sync.Mutex
+	lock              sync.Mutex
+	reg               map[reflect.Type]map[string]reflect.Value
+	initFuncReg       map[reflect.Type]InitializingFunc
+	callbackFuncReg   map[reflect.Type]CallbackFunc
+	nameFinderFuncReg map[reflect.Type]NameFinderFunc
 )
 
 func init() {
 	reg = make(map[reflect.Type]map[string]reflect.Value)
+	initFuncReg = make(map[reflect.Type]InitializingFunc)
+	callbackFuncReg = make(map[reflect.Type]CallbackFunc)
+	nameFinderFuncReg = make(map[reflect.Type]NameFinderFunc)
 }
 
-// Add new plugin category. The different plugin category could have same plugin names.
-func AddPluginCategory(pluginCategory reflect.Type) {
+// RegisterCategory register new plugin category with default InitializingFunc.
+// required:
+// pluginCategory: the plugin interface type.
+// Optional:
+// n: the plugin name finder,and the default value is defaultNameFinder
+// i, the plugin initializer, and the default value is defaultInitializing
+// c, the plugin initializer callback func, and the default value is defaultCallBack
+func RegisterPluginCategory(pluginCategory reflect.Type, n NameFinderFunc, i InitializingFunc, c CallbackFunc) {
 	lock.Lock()
 	defer lock.Unlock()
 	reg[pluginCategory] = map[string]reflect.Value{}
+
+	if n == nil {
+		nameFinderFuncReg[pluginCategory] = defaultNameFinder
+	} else {
+		nameFinderFuncReg[pluginCategory] = n
+	}
+	if i == nil {
+		initFuncReg[pluginCategory] = defaultInitializing
+	} else {
+		initFuncReg[pluginCategory] = i
+	}
+	if c == nil {
+		callbackFuncReg[pluginCategory] = defaultCallBack
+	} else {
+		callbackFuncReg[pluginCategory] = c
+	}
 }
 
 // RegisterPlugin registers the pluginType as plugin.
 // If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
-func RegisterPlugin(pluginName string, plugin interface{}) {
+func RegisterPlugin(plugin Plugin) {
 	lock.Lock()
 	defer lock.Unlock()
 	v := reflect.ValueOf(plugin)
 	success := false
 	for pCategory, pReg := range reg {
 		if v.Type().Implements(pCategory) {
-			pReg[pluginName] = v
-			fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+			pReg[plugin.Name()] = v
+			fmt.Printf("register %s %s successfully ", plugin.Name(), v.Type().String())
 			success = true
 		}
 	}
 	if !success {
-		fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+		fmt.Printf("this type of %s is not supported to register : %s", plugin.Name(), v.Type().String())
 	}
 }
 
-// Get the specific plugin according to the pluginCategory and pluginName.
-func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
-	value, ok := reg[pluginCategory][pluginName]
+// Get an initialized specific plugin according to the pluginCategory and pluginName.
+func Get(category reflect.Type, cfg interface{}) Plugin {
+	lock.Lock()
+	defer lock.Unlock()
+	pluginName := nameFinderFuncReg[category](cfg)
+	value, ok := reg[category][pluginName]
 	if !ok {
-		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+		panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
 	}
 	t := value.Type()
 	if t.Kind() == reflect.Ptr {
 		t = t.Elem()
 	}
+
 	plugin := reflect.New(t).Interface().(Plugin)
-	plugin.InitPlugin(config)
+	initFuncReg[category](plugin, cfg)
+	callbackFuncReg[category](plugin)
 	return plugin
 }
diff --git a/main.go b/internal/pkg/test/path.go
similarity index 69%
copy from main.go
copy to internal/pkg/test/path.go
index 94fbc40..cf5adcc 100644
--- a/main.go
+++ b/internal/pkg/test/path.go
@@ -15,8 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package test
 
-func main() {
-	print("OK")
+import (
+	"fmt"
+	"os"
+	"strings"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+)
+
+func FindRootPath() (string, error) {
+	pwd, err := os.Getwd()
+	if err != nil {
+		return "", fmt.Errorf("could not find the project root path: %v", err)
+	}
+	return pwd[0 : strings.Index(pwd, constant.ProjectName)+len(constant.ProjectName)], nil
 }
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
new file mode 100644
index 0000000..b8ae2db
--- /dev/null
+++ b/internal/satellite/boot/boot.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package boot
+
+import (
+	"context"
+	"os"
+	"sync"
+	"syscall"
+
+	"os/signal"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/module"
+)
+
+// Start Satellite.
+func Start(cfg *config.SatelliteConfig) error {
+	ctx, cancel := context.WithCancel(context.Background())
+	addShutdownListener(cancel)
+	initLogger(cfg)
+	initModules(cfg)
+	if err := prepareModules(); err != nil {
+		return err
+	}
+	bootModules(ctx)
+	return nil
+}
+
+// addShutdownListener add a close signal listener.
+func addShutdownListener(cancel context.CancelFunc) {
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
+	go func() {
+		<-signals
+		cancel()
+	}()
+}
+
+// initLogger init the global logger.
+func initLogger(cfg *config.SatelliteConfig) {
+	log.Init(cfg.Logger)
+}
+
+// initModules init the modules and register the modules to the module container.
+func initModules(cfg *config.SatelliteConfig) {
+	log.Logger.Infof("satellite is initializing...")
+	for _, agentConfig := range cfg.Agents {
+		if agentConfig.ClientManager != nil {
+			module.NewModuleService(agentConfig.ClientManager)
+		}
+		if agentConfig.Sender != nil {
+			module.NewModuleService(agentConfig.Sender)
+		}
+		if agentConfig.Processor != nil {
+			module.NewModuleService(agentConfig.Processor)
+		}
+		if agentConfig.Gatherer != nil {
+			module.NewModuleService(agentConfig.Gatherer)
+		}
+	}
+}
+
+// prepareModules makes that all modules are in a bootable state.
+func prepareModules() error {
+	log.Logger.Infof("satellite is prepare to start...")
+	for _, service := range module.GetModuleContainer() {
+		if err := service.Prepare(); err != nil {
+			log.Logger.Errorf("%s module of %s namespace is error in preparing stage, error is %v", service.Name(), service.Config().NameSpace(), err)
+			return err
+		}
+	}
+	return nil
+}
+
+// prepareModules boot all modules.
+func bootModules(ctx context.Context) {
+	log.Logger.Infof("satellite is starting...")
+	var wg sync.WaitGroup
+	for _, service := range module.GetModuleContainer() {
+		service := service
+		go func() {
+			defer wg.Done()
+			wg.Add(1)
+			service.Boot(ctx)
+		}()
+	}
+	wg.Wait()
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/clientmanager_config.go
similarity index 58%
copy from plugins/client/api/client.go
copy to internal/satellite/config/clientmanager_config.go
index 51d2b65..3f8bdfa 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/clientmanager_config.go
@@ -15,33 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package config
 
 import (
-	"reflect"
-
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
+// Config defines the initialization params for ClientManager.
+type ClientManagerConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
+	// plugins config
+	ClientConfig plugin.DefaultConfig `mapstructure:"client"` // the client plugin config
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+	// self config
+	RetryInterval int64 `mapstructure:"retry_interval"` // the client retry interval when disconnected.
+}
 
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (c *ClientManagerConfig) ModuleName() string {
+	return constant.ClientManagerModule
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+func (c *ClientManagerConfig) NameSpace() string {
+	return c.common.RunningNamespace
 }
diff --git a/plugins/filter/example/filter.go b/internal/satellite/config/gatherer_config.go
similarity index 56%
rename from plugins/filter/example/filter.go
rename to internal/satellite/config/gatherer_config.go
index 14abb69..ded14e1 100644
--- a/plugins/filter/example/filter.go
+++ b/internal/satellite/config/gatherer_config.go
@@ -15,40 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package example
+package config
 
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
 
-type demoFilter struct {
-	a string
-}
-
-type demoFilter2 struct {
-	a string
-}
-
-type demoFilter3 struct {
-	a string
-}
-
-func (d *demoFilter) Description() string {
-	panic("implement me")
-}
-
-func (d *demoFilter) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFilter) Process(in event.Event) event.Event {
-	panic("implement me")
-}
+type GathererConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-func (d demoFilter2) Description() string {
-	panic("implement me")
+	// plugins config
+	CollectorConfig plugin.DefaultConfig `mapstructure:"collector"` // collector plugin config
+	QueueConfig     plugin.DefaultConfig `mapstructure:"queue"`     // queue plugin config
 }
 
-func (d demoFilter2) InitPlugin(config map[string]interface{}) {
+func (g *GathererConfig) ModuleName() string {
+	return constant.GathererModule
 }
 
-func (d demoFilter2) Process(in event.Event) event.Event {
-	panic("implement me")
+func (g *GathererConfig) NameSpace() string {
+	return g.common.RunningNamespace
 }
diff --git a/internal/satellite/config/loader.go b/internal/satellite/config/loader.go
new file mode 100644
index 0000000..c9b0dac
--- /dev/null
+++ b/internal/satellite/config/loader.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"sync"
+
+	"io/ioutil"
+	"path/filepath"
+
+	"github.com/spf13/viper"
+)
+
+var (
+	cfgLock sync.Mutex
+)
+
+// Load SatelliteConfig. The func could not use global logger of Satellite, because it is executed before logger initialization.
+func Load(configPath string) *SatelliteConfig {
+	cfgLock.Lock()
+	defer cfgLock.Unlock()
+	fmt.Printf("load config from : %s\n", configPath)
+	cfg, err := load(configPath)
+	if err != nil {
+		panic(fmt.Errorf("could not load config form the path: %s, the error is :%v", configPath, err))
+	} else {
+		return cfg
+	}
+}
+
+// load SatelliteConfig from the yaml config.
+func load(configPath string) (*SatelliteConfig, error) {
+	absolutePath, err := filepath.Abs(configPath)
+	if err != nil {
+		return nil, err
+	}
+	content, err := ioutil.ReadFile(absolutePath)
+	if err != nil {
+		return nil, err
+	}
+	v := viper.New()
+	v.SetConfigType("yaml")
+	cfg := SatelliteConfig{}
+	if err := v.ReadConfig(bytes.NewReader(content)); err != nil {
+		return nil, err
+	}
+	if err := v.Unmarshal(&cfg); err != nil {
+		return nil, err
+	}
+	return &cfg, nil
+}
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
new file mode 100644
index 0000000..e31c7f2
--- /dev/null
+++ b/internal/satellite/config/loader_test.go
@@ -0,0 +1,133 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"encoding/json"
+	"reflect"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/pkg/test"
+)
+
+func TestLoad(t *testing.T) {
+	rootPath, err := test.FindRootPath()
+	if err != nil {
+		panic(err)
+	}
+	type args struct {
+		configPath string
+	}
+	tests := []struct {
+		name string
+		args args
+		want *SatelliteConfig
+	}{
+		{
+			name: "Legal configuration",
+			args: args{configPath: rootPath + "/configs/satellite_config.yaml"},
+			want: &SatelliteConfig{
+				Logger: &log.LoggerConfig{
+					LogPattern:  "%time [%level][%field] - %msg",
+					TimePattern: "2006-01-02 15:04:05.001",
+					Level:       "info",
+				},
+				Agents: []*AgentConfig{
+					{
+						ModuleCommonConfig: &ModuleCommonConfig{
+							RunningNamespace: "namespace1",
+						},
+
+						Gatherer: &GathererConfig{
+							CollectorConfig: plugin.DefaultConfig{
+								"plugin_name": "segment-receiver",
+								"k":           "v",
+							},
+							QueueConfig: plugin.DefaultConfig{
+								"plugin_name": "mmap-queue",
+								"key":         "value",
+							},
+						},
+						Processor: &ProcessorConfig{
+							FilterConfig: []plugin.DefaultConfig{
+								{
+									"plugin_name": "filtertype1",
+									"key":         "value",
+								},
+							},
+						},
+						Sender: &SenderConfig{
+							FlushTime:     5000,
+							MaxBufferSize: 100,
+							ForwardersConfig: []plugin.DefaultConfig{
+								{
+									"plugin_name": "segment-forwarder",
+									"key":         "value",
+								},
+							},
+						},
+					},
+					{
+						ModuleCommonConfig: &ModuleCommonConfig{
+							RunningNamespace: "sharing",
+						},
+						ClientManager: &ClientManagerConfig{
+							RetryInterval: 5000,
+							ClientConfig: plugin.DefaultConfig{
+								"plugin_name": "grpc",
+								"k":           "v",
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c, err := load(tt.args.configPath)
+			if err != nil {
+				t.Fatalf("cannot load config: %v", err)
+			}
+			doJudgeEqual(t, c.Logger, tt.want.Logger)
+			doJudgeEqual(t, c.Agents[0].ModuleCommonConfig, tt.want.Agents[0].ModuleCommonConfig)
+			doJudgeEqual(t, c.Agents[0].Gatherer, tt.want.Agents[0].Gatherer)
+			doJudgeEqual(t, c.Agents[0].Processor, tt.want.Agents[0].Processor)
+			doJudgeEqual(t, c.Agents[0].Sender, tt.want.Agents[0].Sender)
+
+			doJudgeEqual(t, c.Agents[1].ModuleCommonConfig, tt.want.Agents[1].ModuleCommonConfig)
+			doJudgeEqual(t, c.Agents[1].ClientManager, tt.want.Agents[1].ClientManager)
+		})
+	}
+}
+
+func doJudgeEqual(t *testing.T, a, b interface{}) {
+	if !reflect.DeepEqual(a, b) {
+		ajson, err := json.Marshal(a)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		bjson, err := json.Marshal(b)
+		if err != nil {
+			t.Fatalf("cannot do json format: %v", err)
+		}
+		t.Fatalf("config is not equal, got %s, want %s", ajson, bjson)
+	}
+}
diff --git a/main.go b/internal/satellite/config/processor_config.go
similarity index 62%
rename from main.go
rename to internal/satellite/config/processor_config.go
index 94fbc40..dd65844 100644
--- a/main.go
+++ b/internal/satellite/config/processor_config.go
@@ -15,8 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package main
+package config
 
-func main() {
-	print("OK")
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+type ProcessorConfig struct {
+	// common config
+	common *ModuleCommonConfig
+
+	// plugins config
+	FilterConfig []plugin.DefaultConfig `mapstructure:"filters"` // filter plugins
+}
+
+func (p *ProcessorConfig) ModuleName() string {
+	return constant.ProcessorModule
+}
+
+func (p *ProcessorConfig) NameSpace() string {
+	return p.common.RunningNamespace
 }
diff --git a/internal/satellite/config/satellite_config.go b/internal/satellite/config/satellite_config.go
new file mode 100644
index 0000000..9160cc6
--- /dev/null
+++ b/internal/satellite/config/satellite_config.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// SatelliteConfig is to initialize Satellite.
+type SatelliteConfig struct {
+	Logger *log.LoggerConfig `mapstructure:"logger"`
+	Agents []*AgentConfig    `mapstructure:"agents"`
+}
+
+// AgentConfig initializes the different module in different namespace.
+type AgentConfig struct {
+	ModuleCommonConfig *ModuleCommonConfig  `mapstructure:"common_config"`
+	ClientManager      *ClientManagerConfig `mapstructure:"client_manager"`
+	Gatherer           *GathererConfig      `mapstructure:"gatherer"`
+	Processor          *ProcessorConfig     `mapstructure:"processor"`
+	Sender             *SenderConfig        `mapstructure:"sender"`
+}
+
+// ModuleConfig is an interface to define the initialization fields in ModuleService.
+type ModuleConfig interface {
+	// ModuleName returns the name of current module.
+	ModuleName() string
+	// NameSpace returns the current running namespace. Satellite support multi-agents running in one Satellite
+	// process, the namespace is a space concept to distinguish different agents.
+	NameSpace() string
+}
+
+// ModuleCommonConfig has some common fields of each ModuleConfig.
+type ModuleCommonConfig struct {
+	RunningNamespace string `mapstructure:"namespace"`
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/sender_config.go
similarity index 52%
copy from plugins/client/api/client.go
copy to internal/satellite/config/sender_config.go
index 51d2b65..d2fa50d 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/sender_config.go
@@ -15,33 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package config
 
 import (
-	"reflect"
-
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
-	plugin.Plugin
+type SenderConfig struct {
+	// common config
+	common ModuleCommonConfig
 
-	// Prepare would make connection with outer service.
-	Prepare()
-	// GetConnection returns the connected client to publish events.
-	GetConnectedClient() interface{}
-	// Close the connection with outer service.
-	Close()
-}
+	// plugins config
+	ForwardersConfig []plugin.DefaultConfig `mapstructure:"forwarders"` // forwarder plugins config
+	FallbackerConfig plugin.DefaultConfig   `mapstructure:"fallbacker"` // fallbacker plugins config
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+	// self config
+	MaxBufferSize  int `mapstructure:"max_buffer_size"`  // the max buffer capacity
+	MinFlushEvents int `mapstructure:"min_flush_events"` // the min flush events when receives a timer flush signal
+	FlushTime      int `mapstructure:"flush_time"`       // the period flush time
+}
 
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (s *SenderConfig) ModuleName() string {
+	return constant.SenderModule
 }
 
-func init() {
-	plugin.AddPluginCategory(ClientCategory)
+func (s *SenderConfig) NameSpace() string {
+	return s.common.RunningNamespace
 }
diff --git a/internal/satellite/module/buffer/buffer.go b/internal/satellite/module/buffer/buffer.go
new file mode 100644
index 0000000..ec3c59e
--- /dev/null
+++ b/internal/satellite/module/buffer/buffer.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// BatchBuffer is a buffer to cache the input data in Sender.
+type BatchBuffer struct {
+	sync.Mutex                             // local
+	buf        []*event.OutputEventContext // cache
+	first      int64                       // the first OutputEventContext offset
+	last       int64                       // the last OutputEventContext offset
+	size       int                         // usage size
+	cap        int                         // the max capacity
+}
+
+// NewBuffer crate a new BatchBuffer according to the capacity param.
+func NewBatchBuffer(capacity int) *BatchBuffer {
+	return &BatchBuffer{
+		buf:   make([]*event.OutputEventContext, capacity),
+		first: 0,
+		last:  0,
+		size:  0,
+		cap:   capacity,
+	}
+}
+
+// Buf returns the cached data in BatchBuffer.
+func (b *BatchBuffer) Buf() []*event.OutputEventContext {
+	b.Lock()
+	defer b.Unlock()
+	return b.buf
+}
+
+// First returns the first OutputEventContext offset.
+func (b *BatchBuffer) First() int64 {
+	b.Lock()
+	defer b.Unlock()
+	return b.first
+}
+
+// Len returns the usage size.
+func (b *BatchBuffer) Len() int {
+	b.Lock()
+	defer b.Unlock()
+	return b.size
+}
+
+// BatchSize returns the offset increment of the cached data.
+func (b *BatchBuffer) BatchSize() int {
+	b.Lock()
+	defer b.Unlock()
+	return int(b.last - b.first + 1)
+}
+
+// Add adds a new data input buffer.
+func (b *BatchBuffer) Add(data *event.OutputEventContext) {
+	b.Lock()
+	defer b.Unlock()
+	if b.size == b.cap {
+		log.Logger.Errorf("cannot add one item to the fulling BatchBuffer, the capacity is %d", b.cap)
+		return
+	} else if data.Offset <= 0 {
+		log.Logger.Errorf("cannot add one item to BatchBuffer because the input data is illegal, the offset is %d", data.Offset)
+		return
+	}
+	if b.size == 0 {
+		b.first = data.Offset
+	} else {
+		b.last = data.Offset
+	}
+	b.buf[b.size] = data
+	b.size++
+}
diff --git a/plugins/client/example/client_test.go b/internal/satellite/module/buffer/buffer_test.go
similarity index 54%
rename from plugins/client/example/client_test.go
rename to internal/satellite/module/buffer/buffer_test.go
index a9e355e..be9e9a8 100644
--- a/plugins/client/example/client_test.go
+++ b/internal/satellite/module/buffer/buffer_test.go
@@ -15,57 +15,58 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package example
+package buffer
 
 import (
 	"testing"
 
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/client/api"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
 )
 
-func Test_Register(t *testing.T) {
+func TestNewBuffer(t *testing.T) {
+	buffer := NewBatchBuffer(3)
 	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
+		name string
+		args *event.OutputEventContext
+		want int
 	}{
 		{
-			name: "demoClient",
-			args: demoClient{
-				a: "s",
-			},
-			panic: false,
+			name: "add-1",
+			args: &event.OutputEventContext{Offset: 1},
+			want: 1,
 		},
 		{
-			name: "demoClient2",
-			args: &demoClient2{
-				a: "s",
-			},
-			panic: false,
+			name: "add-2",
+			args: &event.OutputEventContext{Offset: 2},
+			want: 2,
 		},
 		{
-			name: "demoClient3",
-			args: demoClient3{
-				a: "s",
-			},
-			panic: true,
+			name: "add-3",
+			args: &event.OutputEventContext{Offset: 3},
+			want: 3,
+		},
+		{
+			name: "add-4",
+			args: &event.OutputEventContext{Offset: 4},
+			want: 3,
+		},
+		{
+			name: "add-5",
+			args: &event.OutputEventContext{Offset: 5},
+			want: 3,
 		},
 	}
-
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
+			buffer.Add(tt.args)
+			if got := buffer.Len(); got != tt.want {
+				t.Errorf("Buffer Len() = %v, want %v", got, tt.want)
+			}
 		})
 	}
 }
 
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetClient(name, config)
+func init() {
+	log.Init(&log.LoggerConfig{})
 }
diff --git a/internal/satellite/module/clientmanager.go b/internal/satellite/module/clientmanager.go
new file mode 100644
index 0000000..3d98cac
--- /dev/null
+++ b/internal/satellite/module/clientmanager.go
@@ -0,0 +1,182 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	client "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+// The specific client statuses.
+const (
+	_ ClientStatus = iota
+	Connected
+	Disconnect
+)
+
+// ClientStatus represents the status of the client.
+type ClientStatus int8
+
+// ClientManager is a module plugin to control the connection with the outer service.
+type ClientManager struct {
+	sync.Mutex
+
+	// config
+	config *config.ClientManagerConfig
+
+	// dependency plugins
+	runningClient client.Client
+
+	// self components
+	listeners []chan ClientStatus // the sender client status listeners
+
+	// Metrics:
+	retryCount int64
+	status     ClientStatus
+}
+
+func (c *ClientManager) Name() string {
+	return constant.ClientManagerModule
+}
+
+func (c *ClientManager) Description() string {
+	return "keep connection with external services, such as Kafka and OAP."
+}
+
+func (c *ClientManager) Config() config.ModuleConfig {
+	return c.config
+}
+
+// Init ClientManager, dependency plugins and self components.
+func (c *ClientManager) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", c.Name(), c.config.NameSpace())
+	c.config = cfg.(*config.ClientManagerConfig)
+	c.runningClient = client.GetClient(c.config.ClientConfig)
+	c.listeners = []chan ClientStatus{}
+}
+
+// Prepare makes the connection with external services, such as Kafka and OAP.
+func (c *ClientManager) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", c.Name(), c.config.NameSpace())
+	return c.initializeClient()
+}
+
+// Boot start ClientManager to maintain the connection with external services.
+func (c *ClientManager) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", c.Name(), c.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(c.config.RetryInterval) * time.Millisecond)
+		for {
+			select {
+			case <-timeTicker.C:
+				if err := c.connectClient(); err != nil {
+					log.Logger.Errorf("cannot make a connection with the %s client", c.runningClient.Name())
+				}
+			case <-ctx.Done():
+				c.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown close the connection and listeners.
+func (c *ClientManager) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", c.Name(), c.config.NameSpace())
+	for _, listener := range c.listeners {
+		close(listener)
+	}
+	if err := c.runningClient.Close(); err != nil {
+		log.Logger.Errorf("an error occurring when closing %s client: %v", c.runningClient.Name(), err)
+	}
+}
+
+// RegisterListener adds the listener to listen to the status of the client.
+func (c *ClientManager) RegisterListener(listener chan ClientStatus) {
+	c.Lock()
+	defer c.Unlock()
+	c.listeners = append(c.listeners, listener)
+}
+
+// GetClient returns a connected client when . Otherwise, would return a nil client.
+func (c *ClientManager) GetConnectedClient() interface{} {
+	return c.runningClient.GetConnectedClient()
+}
+
+// ReportError reports the client is disconnect.
+func (c *ClientManager) ReportError() {
+	c.Lock()
+	defer c.Unlock()
+	if c.status == Connected {
+		c.status = Disconnect
+		c.notify()
+	}
+}
+
+// initializeClient initialize the connection with external services and retry one time when initialize failed.
+func (c *ClientManager) initializeClient() error {
+	c.Lock()
+	defer c.Unlock()
+	if err := c.connectClient(); err != nil {
+		log.Logger.Infof("preparing to reconnect with %s client,retrying in 10s", c.runningClient.Name())
+		time.Sleep(10 * time.Second)
+		return c.connectClient()
+	}
+	return nil
+}
+
+// connectClient would make a connection with external services, such as Kafka and OAP. When successfully connected,
+// ClientManager would notify the connected status to all senders.
+func (c *ClientManager) connectClient() error {
+	c.Lock()
+	defer c.Unlock()
+	if c.runningClient.IsConnected() {
+		return nil
+	}
+	log.Logger.Infof("preparing to connect with %s client", c.runningClient.Name())
+	if c.runningClient.IsConnected() {
+		return nil
+	}
+	c.retryCount++
+	err := c.runningClient.Connect()
+	if err == nil {
+		c.status = Connected
+		c.notify()
+		log.Logger.Infof("successfully connected to %s client", c.runningClient.Name())
+	}
+	return err
+}
+
+// notify the current status to all listeners.
+func (c *ClientManager) notify() {
+	c.Lock()
+	defer c.Unlock()
+	for _, listener := range c.listeners {
+		listener <- c.status
+	}
+}
diff --git a/internal/satellite/module/definition.go b/internal/satellite/module/definition.go
new file mode 100644
index 0000000..6348f03
--- /dev/null
+++ b/internal/satellite/module/definition.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+const (
+	// If a module is stored in the sharing namespace, the modules would be shared with all namespaces in moduleContainer.
+	SharingNamespaceName = "sharing"
+)
+
+// TODO add metrics func
+// Service id a custom plugin interface, which defines the processing.
+type Service interface {
+	plugin.Plugin
+
+	// Config returns the using ModuleConfig.
+	Config() config.ModuleConfig
+
+	// Init initialize the Module and register the instance to the registry.
+	// In this stage, no components is running.
+	Init(config config.ModuleConfig)
+
+	// Prepare would inject the dependency module and do dependency initialization.
+	Prepare() error
+
+	// Boot would start the module and return error when started failed. When a stop signal received
+	// or an exception occurs, the shutdown function would be called.
+	Boot(ctx context.Context)
+
+	// Shutdown could do some clean job to close Service.
+	Shutdown()
+}
+
+var moduleContainer map[string]Service
+
+// NewModuleService returns a new initialized Service and register it to the module container.
+func NewModuleService(cfg config.ModuleConfig) {
+	_ = plugin.Get(reflect.TypeOf((*Service)(nil)).Elem(), cfg).(Service)
+}
+
+// GetRunningModule returns a running Service.
+func GetRunningModule(namespace, moduleName string) Service {
+	if moduleService, ok := moduleContainer[namespace+moduleName]; ok {
+		return moduleService
+	}
+	if moduleService, ok := moduleContainer[SharingNamespaceName+moduleName]; ok {
+		return moduleService
+	}
+	return nil
+}
+
+func GetModuleContainer() map[string]Service {
+	return moduleContainer
+}
+
+// Register the Service category to the plugin registry.
+func init() {
+	moduleContainer = make(map[string]Service)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Service)(nil)).Elem(),
+		func(cfg interface{}) string {
+			// Get plugin name to find the specific Service plugin.
+			return cfg.(config.ModuleConfig).ModuleName()
+		},
+		func(plugin plugin.Plugin, cfg interface{}) {
+			// Initialize Service.
+			ms := plugin.(Service)
+			mc := cfg.(config.ModuleConfig)
+			ms.Init(mc)
+		},
+		func(plugin plugin.Plugin) {
+			// Register the initialized Service to the moduleContainer.
+			ms := plugin.(Service)
+			moduleContainer[ms.Config().NameSpace()+ms.Config().ModuleName()] = ms
+		},
+	)
+}
diff --git a/internal/satellite/module/gatherer.go b/internal/satellite/module/gatherer.go
new file mode 100644
index 0000000..765f6cc
--- /dev/null
+++ b/internal/satellite/module/gatherer.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	collector "github.com/apache/skywalking-satellite/plugins/collector/api"
+	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+// Gatherer is the APM data collection module in Satellite.
+type Gatherer struct {
+	// config
+	config *config.GathererConfig
+
+	// dependency plugins
+	runningCollector collector.Collector
+	runningQueue     queue.Queue
+}
+
+func (g *Gatherer) Name() string {
+	return constant.GathererModule
+}
+
+func (g *Gatherer) Description() string {
+	return "gatherer is the APM data collection module in Satellite, which supports Log, Trace, and Metrics scopes."
+}
+
+func (g *Gatherer) Config() config.ModuleConfig {
+	return g.config
+}
+
+// Init Gatherer, dependency plugins.
+func (g *Gatherer) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", g.Name(), g.config.NameSpace())
+	g.config = cfg.(*config.GathererConfig)
+	g.runningCollector = collector.GetCollector(g.config.CollectorConfig)
+	g.runningQueue = queue.GetQueue(g.config.CollectorConfig)
+}
+
+// Prepare starts Collector and Queue.
+func (g *Gatherer) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", g.Name(), g.config.NameSpace())
+	if err := g.runningCollector.Prepare(); err != nil {
+		log.Logger.Infof("%s collector of %s namespace was failed to initialize", g.runningCollector.Name(), g.config.NameSpace())
+		return err
+	}
+
+	if err := g.runningQueue.Prepare(); err != nil {
+		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", g.runningQueue.Name(), g.config.NameSpace())
+		return err
+	}
+	return nil
+}
+
+// Boot fetches Collector input data and pushes it into Queue.
+func (g *Gatherer) Boot(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		for {
+			select {
+			case e := <-g.runningCollector.EventChannel():
+				err := g.runningQueue.Publisher().Enqueue(e)
+				if err != nil {
+					// todo add abandonedCount metrics
+					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", g.config.NameSpace(), err)
+				}
+			case <-ctx.Done():
+				g.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown close Collector and Queue.
+func (g *Gatherer) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", g.Name(), g.config.NameSpace())
+	if err := g.runningCollector.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s collector  in %s namespace, error is: %v", g.runningCollector.Name(), g.config.NameSpace(), err)
+	}
+	if err := g.runningQueue.Close(); err != nil {
+		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace, error is: %v", g.runningQueue.Name(), g.config.NameSpace(), err)
+	}
+}
+
+// Ack some events according to the startOffset and the BatchSize
+func (g *Gatherer) Ack(startOffset int64, batchSize int) {
+	<-g.runningQueue.Ack(startOffset, batchSize)
+}
diff --git a/internal/satellite/module/processor.go b/internal/satellite/module/processor.go
new file mode 100644
index 0000000..e1cdf96
--- /dev/null
+++ b/internal/satellite/module/processor.go
@@ -0,0 +1,115 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+// Processor is the processing module in Satellite.
+type Processor struct {
+	// config
+	config *config.ProcessorConfig
+
+	// dependency plugins
+	runningFilters []filter.Filter
+
+	// dependency modules
+	sender   *Sender
+	gatherer *Gatherer
+}
+
+func (p *Processor) Name() string {
+	return constant.ProcessorModule
+}
+
+func (p *Processor) Description() string {
+	return "processor module is the core processing module in Satellite"
+}
+
+func (p *Processor) Config() config.ModuleConfig {
+	return p.config
+}
+
+// Init Processor and dependency plugins
+func (p *Processor) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", p.Name(), p.config.NameSpace())
+	p.config = cfg.(*config.ProcessorConfig)
+	p.runningFilters = []filter.Filter{}
+	for _, c := range p.config.FilterConfig {
+		p.runningFilters = append(p.runningFilters, filter.GetFilter(c))
+	}
+}
+
+// Prepare inject the dependency modules.
+func (p *Processor) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", p.Name(), p.config.NameSpace())
+	p.sender = GetRunningModule(p.config.NameSpace(), constant.SenderModule).(*Sender)
+	p.gatherer = GetRunningModule(p.config.NameSpace(), constant.GathererModule).(*Gatherer)
+	return nil
+}
+
+// BOOT fetches the data of Queue, does a series of processing, and then sends to Sender.
+func (p *Processor) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", p.Name(), p.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		defer wg.Done()
+		for {
+			// fetch a new event from Queue of Gatherer
+			e, offset, err := p.gatherer.runningQueue.Consumer().Dequeue()
+			if err != nil {
+				// todo add metrics
+				log.Logger.Errorf("cannot get event from queue in %s namespace, error is: %v",
+					p.config.NameSpace(), err)
+				continue
+			}
+			c := &event.OutputEventContext{
+				Offset:  offset,
+				Context: make(map[string]event.Event),
+			}
+			// processing the event with filters, that put the necessary events to OutputEventContext.
+			c.Put(e)
+			for _, f := range p.runningFilters {
+				f.Process(c)
+			}
+
+			select {
+			// put result input the Input channel of Sender
+			case p.sender.Input <- c:
+			case <-ctx.Done():
+				p.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+func (p *Processor) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", p.Name(), p.config.NameSpace())
+}
diff --git a/internal/satellite/module/sender.go b/internal/satellite/module/sender.go
new file mode 100644
index 0000000..54ac3ac
--- /dev/null
+++ b/internal/satellite/module/sender.go
@@ -0,0 +1,181 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/constant"
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+	fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Sender is the forward module in Satellite.
+type Sender struct {
+	// config
+	config *config.SenderConfig
+
+	// dependency plugins
+	runningForwarders []forwarder.Forwarder
+	runningFallbacker fallbacker.Fallbacker
+
+	// dependency modules
+	gatherer      *Gatherer
+	clientManager *ClientManager
+
+	// self components
+	Input        chan *event.OutputEventContext // logic input channel
+	input        chan *event.OutputEventContext // physical input channel
+	listener     chan ClientStatus              // client status listener
+	flushChannel chan *buffer.BatchBuffer       // forwarder flush channel
+	buffer       *buffer.BatchBuffer            // cache the downstream input data
+}
+
+func (s *Sender) Name() string {
+	return constant.SenderModule
+}
+
+func (s *Sender) Description() string {
+	return "forward the input events to external services, such as Kafka and OAP"
+}
+
+func (s *Sender) Config() config.ModuleConfig {
+	return s.config
+}
+
+// Init Sender, dependency plugins and self components.
+func (s *Sender) Init(cfg config.ModuleConfig) {
+	log.Logger.Infof("%s module of %s namespace is being initialized", s.Name(), s.config.NameSpace())
+	s.config = cfg.(*config.SenderConfig)
+	s.runningFallbacker = fallbacker.GetFallbacker(s.config.FallbackerConfig)
+	s.runningForwarders = []forwarder.Forwarder{}
+	for _, c := range s.config.ForwardersConfig {
+		s.runningForwarders = append(s.runningForwarders, forwarder.GetForwarder(c))
+	}
+	s.input = make(chan *event.OutputEventContext)
+	s.Input = s.input
+	s.listener = make(chan ClientStatus)
+	s.flushChannel = make(chan *buffer.BatchBuffer, 1)
+	s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+}
+
+// Prepare inject the dependency modules and register the client status listener to the clientManager.
+func (s *Sender) Prepare() error {
+	log.Logger.Infof("%s module of %s namespace is in preparing stage", s.Name(), s.config.NameSpace())
+	s.clientManager = GetRunningModule(s.config.NameSpace(), constant.ClientManagerModule).(*ClientManager)
+	s.gatherer = GetRunningModule(s.config.NameSpace(), constant.GathererModule).(*Gatherer)
+	s.clientManager.RegisterListener(s.listener)
+	return nil
+}
+
+// Boot fetches the downstream input data and forward to external services, such as Kafka and OAP receiver.
+func (s *Sender) Boot(ctx context.Context) {
+	log.Logger.Infof("%s module of %s namespace is running", s.Name(), s.config.NameSpace())
+	var wg sync.WaitGroup
+	wg.Add(2)
+	// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+	// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+	go func() {
+		defer wg.Done()
+		timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+		for {
+			select {
+			case status := <-s.listener:
+				switch status {
+				case Connected:
+					log.Logger.Infof("%s module of %s namespace is notified the connection is connected", s.Name(), s.config.NameSpace())
+					s.Input = s.input
+				case Disconnect:
+					log.Logger.Infof("%s module of %s namespace is notified the connection is disconnected", s.Name(), s.config.NameSpace())
+					s.Input = nil
+				}
+			case <-timeTicker.C:
+				if s.buffer.Len() > s.config.MinFlushEvents {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case e := <-s.Input:
+				s.buffer.Add(e)
+				if s.buffer.Len() == s.config.MaxBufferSize {
+					s.flushChannel <- s.buffer
+					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+				}
+			case <-ctx.Done():
+				s.Input = nil
+				return
+			}
+		}
+	}()
+	// Keep fetching BatchBuffer to forward.
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case b := <-s.flushChannel:
+				s.consume(b)
+			case <-ctx.Done():
+				s.Shutdown()
+				return
+			}
+		}
+	}()
+	wg.Wait()
+}
+
+// Shutdown closes the channels and tries to force forward the events in the buffer.
+func (s *Sender) Shutdown() {
+	log.Logger.Infof("%s module of %s namespace is closing", s.Name(), s.config.NameSpace())
+	close(s.input)
+	for buf := range s.flushChannel {
+		s.consume(buf)
+	}
+	s.consume(s.buffer)
+	close(s.flushChannel)
+}
+
+// consume would forward the events by type and ack this batch.
+func (s *Sender) consume(batch *buffer.BatchBuffer) {
+	log.Logger.Infof("%s module of %s namespace is flushing a new batch buffer. the start offset is %d, and the batch size is %d",
+		s.Name(), s.config.NameSpace(), batch.First(), batch.BatchSize())
+	var events = make(map[event.Type]event.BatchEvents)
+	for i := 0; i < batch.Len(); i++ {
+		eventContext := batch.Buf()[i]
+		for _, e := range eventContext.Context {
+			if e.IsRemote() {
+				events[e.Type()] = append(events[e.Type()], e)
+			}
+		}
+	}
+
+	for _, f := range s.runningForwarders {
+		for t, batchEvents := range events {
+			if f.ForwardType() == t {
+				s.runningFallbacker.FallBack(batchEvents, s.clientManager.GetConnectedClient(), f.Forward, func() {
+					s.clientManager.ReportError()
+				})
+			}
+		}
+	}
+	s.gatherer.Ack(batch.First(), batch.BatchSize())
+}
diff --git a/internal/pkg/plugin/define.go b/plugins/README.md
similarity index 81%
rename from internal/pkg/plugin/define.go
rename to plugins/README.md
index ad95698..c67bca2 100644
--- a/internal/pkg/plugin/define.go
+++ b/plugins/README.md
@@ -1,22 +1,4 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package plugin
-
+```
 // The following graph illustrates the relationship between different plugin interface in api package.
 //
 //
@@ -75,10 +57,4 @@ package plugin
 // There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
 // preparing phase, running phase, and closing phase. In the running phase, each plugin has
 // its own interface definition. However, the other three phases have to be defined uniformly.
-
-type Plugin interface {
-	// Description returns the description of the specific plugin.
-	Description() string
-	// Init initialize the specific plugin.
-	InitPlugin(config map[string]interface{})
-}
+```
\ No newline at end of file
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 51d2b65..33f61a6 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -27,21 +27,24 @@ import (
 type Client interface {
 	plugin.Plugin
 
-	// Prepare would make connection with outer service.
-	Prepare()
+	// Init would make connection with outer service.
+	Connect() error
+
+	// Return the status of the client.
+	IsConnected() bool
+
 	// GetConnection returns the connected client to publish events.
 	GetConnectedClient() interface{}
+
 	// Close the connection with outer service.
-	Close()
+	Close() error
 }
 
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
 // Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
-	return plugin.Get(ClientCategory, pluginName, config).(Client)
+func GetClient(config plugin.DefaultConfig) Client {
+	return plugin.Get(reflect.TypeOf((*Client)(nil)).Elem(), config).(Client)
 }
 
 func init() {
-	plugin.AddPluginCategory(ClientCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/client/example/client.go b/plugins/client/example/client.go
deleted file mode 100644
index 77b29c7..0000000
--- a/plugins/client/example/client.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-type demoClient struct {
-	a string
-}
-type demoClient2 struct {
-	a string
-}
-type demoClient3 struct {
-	a string
-}
-
-func (d demoClient) Description() string {
-	panic("implement me")
-}
-
-func (d demoClient) InitPlugin(config map[string]interface{}) {
-
-}
-
-func (d demoClient) Prepare() {
-	panic("implement me")
-}
-
-func (d demoClient) GetConnectedClient() interface{} {
-	panic("implement me")
-}
-
-func (d demoClient) Close() {
-	panic("implement me")
-}
-
-func (d *demoClient2) Description() string {
-	panic("implement me")
-}
-
-func (d *demoClient2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoClient2) Prepare() {
-	panic("implement me")
-}
-
-func (d *demoClient2) GetConnectedClient() interface{} {
-	panic("implement me")
-}
-
-func (d *demoClient2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
index 374d63a..a756974 100644
--- a/plugins/collector/api/collector.go
+++ b/plugins/collector/api/collector.go
@@ -27,7 +27,7 @@ import (
 //   Init()     Initial stage: Init plugin by config
 //    ||
 //    \/
-//   Prepare()   Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
+//   Init()   Preparing stage: Init the collector, such as build connection with SkyWalking javaagent.
 //    ||
 //    \/
 //   Next()     Running stage: When Collector collect a data, the data would be fetched by the upstream
@@ -38,22 +38,21 @@ import (
 // Collector is a plugin interface, that defines new collectors.
 type Collector interface {
 	plugin.Plugin
-
-	// Prepare creates a listen or reader to gather data.
-	Prepare()
+	// Prepare creates a listener or reader to gather APM data.
+	Prepare() error
 	// Next return the data from the input.
-	Next() (event.SerializableEvent, error)
+	EventChannel() <-chan event.SerializableEvent
 	// Close would close collector.
-	Close()
+	Close() error
 }
 
 var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
 
 // Get collector plugin.
-func GetCollector(pluginName string, config map[string]interface{}) Collector {
-	return plugin.Get(CollectorCategory, pluginName, config).(Collector)
+func GetCollector(config plugin.DefaultConfig) Collector {
+	return plugin.Get(CollectorCategory, config).(Collector)
 }
 
 func init() {
-	plugin.AddPluginCategory(CollectorCategory)
+	plugin.RegisterPluginCategory(CollectorCategory, nil, nil, nil)
 }
diff --git a/plugins/collector/example/collector.go b/plugins/collector/example/collector.go
deleted file mode 100644
index 40b9b8b..0000000
--- a/plugins/collector/example/collector.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoCollector struct {
-	a string
-}
-
-type demoCollector2 struct {
-	a string
-}
-
-type demoCollector3 struct {
-	a string
-}
-
-func (d *demoCollector) Description() string {
-	panic("implement me")
-}
-
-func (d *demoCollector) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoCollector) Prepare() {
-	panic("implement me")
-}
-
-func (d *demoCollector) Next() (event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d *demoCollector) Close() {
-	panic("implement me")
-}
-
-func (d demoCollector2) Description() string {
-	panic("implement me")
-}
-
-func (d demoCollector2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoCollector2) Prepare() {
-	panic("implement me")
-}
-
-func (d demoCollector2) Next() (event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoCollector2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/collector/example/collector_test.go b/plugins/collector/example/collector_test.go
deleted file mode 100644
index e73f3bd..0000000
--- a/plugins/collector/example/collector_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/collector/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoCollector",
-			args: &demoCollector{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoCollector2",
-			args: demoCollector2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoCollector3",
-			args: demoCollector3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetCollector(name, config)
-}
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index bde56ef..91dea45 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -22,23 +22,24 @@ import (
 
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
 )
 
 // Fallbacker is a plugin interface, that defines some fallback strategies.
 type Fallbacker interface {
 	plugin.Plugin
-
 	//  FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
-	FallBack(batch event.BatchEvents) Fallbacker
+	FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc, callback DisconnectionCallback) Fallbacker
 }
 
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
+type DisconnectionCallback func()
 
 // Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
-	return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+func GetFallbacker(config plugin.DefaultConfig) Fallbacker {
+	return plugin.Get(reflect.TypeOf((*Fallbacker)(nil)).Elem(), config).(Fallbacker)
 }
 
+// init register the Fallbacker interface
 func init() {
-	plugin.AddPluginCategory(FallbackerCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/fallbacker/example/fallbacker.go b/plugins/fallbacker/example/fallbacker.go
deleted file mode 100644
index b933e96..0000000
--- a/plugins/fallbacker/example/fallbacker.go
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"github.com/apache/skywalking-satellite/internal/pkg/event"
-	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-type demoFallbacker struct {
-	a string
-}
-
-type demoFallbacker2 struct {
-	a string
-}
-
-type demoFallbacker3 struct {
-	a string
-}
-
-func (d *demoFallbacker) Description() string {
-	panic("implement me")
-}
-
-func (d *demoFallbacker) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFallbacker) FallBack(batch event.BatchEvents) api.Fallbacker {
-	panic("implement me")
-}
-
-func (d demoFallbacker2) Description() string {
-	panic("implement me")
-}
-
-func (d demoFallbacker2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoFallbacker2) FallBack(batch event.BatchEvents) api.Fallbacker {
-	panic("implement me")
-}
diff --git a/plugins/fallbacker/example/fallbacker_test.go b/plugins/fallbacker/example/fallbacker_test.go
deleted file mode 100644
index f5f445f..0000000
--- a/plugins/fallbacker/example/fallbacker_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoFallbacker",
-			args: &demoFallbacker{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFallbacker2",
-			args: demoFallbacker2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFallbacker3",
-			args: demoFallbacker3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetFallbacker(name, config)
-}
diff --git a/plugins/fallbacker/timer/timer_fallbacker.go b/plugins/fallbacker/timer/timer_fallbacker.go
new file mode 100644
index 0000000..b86981e
--- /dev/null
+++ b/plugins/fallbacker/timer/timer_fallbacker.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package timer
+
+import (
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Fallbacker is a timer fallbacker when forward fails. `latencyFactor` is the standard retry duration,
+// and the time for each retry is expanded by 2 times until the number of retries reaches the maximum.
+type Fallbacker struct {
+	maxTimes      int `mapstructure:"max_times"`
+	latencyFactor int `mapstructure:"latency_factor"`
+}
+
+func (t *Fallbacker) Name() string {
+	return "timer-fallbacker"
+}
+
+func (t *Fallbacker) Description() string {
+	return "this is a timer trigger when forward fails."
+}
+
+func (t *Fallbacker) DefaultConfig() string {
+	return `
+max_times: 3
+latency_factor: 2000
+`
+}
+
+func (t *Fallbacker) FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc) {
+	if err := forward(connection, batch); err != nil {
+		count := 1
+		currentLatency := count * t.latencyFactor
+		for count < t.maxTimes {
+			time.Sleep(time.Duration(currentLatency) * time.Millisecond)
+			if err := forward(connection, batch); err != nil {
+				currentLatency *= 2
+			} else {
+				break
+			}
+		}
+	}
+}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index 0ed43db..bc8e9bb 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -34,17 +34,17 @@ import (
 type Filter interface {
 	plugin.Plugin
 
-	// Process produces a new event by processing incoming event.
-	Process(in event.Event) event.Event
+	// Process would fetch the needed event
+	Process(context *event.OutputEventContext)
 }
 
 var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
 
 // Get filter plugin.
-func GetFilter(pluginName string, config map[string]interface{}) Filter {
-	return plugin.Get(FilterCategory, pluginName, config).(Filter)
+func GetFilter(config plugin.DefaultConfig) Filter {
+	return plugin.Get(FilterCategory, config).(Filter)
 }
 
 func init() {
-	plugin.AddPluginCategory(FilterCategory)
+	plugin.RegisterPluginCategory(FilterCategory, nil, nil, nil)
 }
diff --git a/plugins/filter/example/filter_test.go b/plugins/filter/example/filter_test.go
deleted file mode 100644
index aae4371..0000000
--- a/plugins/filter/example/filter_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/filter/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoFilter",
-			args: &demoFilter{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFilter2",
-			args: demoFilter2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoFilter3",
-			args: demoFilter3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetFilter(name, config)
-}
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index 9e99f14..b0509cd 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -24,34 +24,25 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
 
-//   Init()     Initiating stage: Init plugin by config
-//    ||
-//    \/
-//   Prepare()   Preparing stage: Prepare the Forwarder, such as get remote client.
-//    ||
-//    \/
-//   Forward()  Running stage: Forward the batch events
-//    ||
-//    \/
-//   Close()    Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
 // Forwarder is a plugin interface, that defines new forwarders.
 type Forwarder interface {
 	plugin.Plugin
 
 	// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
-	Forward(batch event.BatchEvents)
-
-	// ForwardType returns the supporting event type that could be forwarded.
+	Forward(connection interface{}, batch event.BatchEvents) error
+	// ForwardType returns the supported event type.
 	ForwardType() event.Type
 }
 
-var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
+// ForwardFunc represent the Forward() in Forwarder
+type ForwardFunc func(connection interface{}, batch event.BatchEvents) error
 
-func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
-	return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
+// GetForwarder returns an initialized forwarder plugin.
+func GetForwarder(config map[string]interface{}) Forwarder {
+	return plugin.Get(reflect.TypeOf((*Forwarder)(nil)).Elem(), config).(Forwarder)
 }
 
+// init register the Forwarder interface
 func init() {
-	plugin.AddPluginCategory(ForwarderCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/forwarder/example/forwarder.go b/plugins/forwarder/example/forwarder.go
deleted file mode 100644
index 637f9e1..0000000
--- a/plugins/forwarder/example/forwarder.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoForwarder struct {
-	a string
-}
-
-type demoForwarder2 struct {
-	a string
-}
-
-type demoForwarder3 struct {
-	a string
-}
-
-func (d *demoForwarder) Description() string {
-	panic("implement me")
-}
-
-func (d *demoForwarder) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoForwarder) Forward(batch event.BatchEvents) {
-	panic("implement me")
-}
-
-func (d demoForwarder2) Description() string {
-	panic("implement me")
-}
-
-func (d demoForwarder2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoForwarder2) Forward(batch event.BatchEvents) {
-	panic("implement me")
-}
-
-func (d demoForwarder2) ForwardType() event.Type {
-	panic("implement me")
-}
-
-func (d *demoForwarder) ForwardType() event.Type {
-	panic("implement me")
-}
diff --git a/plugins/forwarder/example/forwarder_test.go b/plugins/forwarder/example/forwarder_test.go
deleted file mode 100644
index 28de35d..0000000
--- a/plugins/forwarder/example/forwarder_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoForwarder",
-			args: &demoForwarder{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoForwarder2",
-			args: demoForwarder2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoForwarder3",
-			args: demoForwarder3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetForwarder(name, config)
-}
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 3ca5ebe..cba3bca 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -38,12 +38,10 @@ type Parser interface {
 	ParseStr(str string) ([]event.SerializableEvent, error)
 }
 
-var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
-
-func GetParser(pluginName string, config map[string]interface{}) Parser {
-	return plugin.Get(ParserCategory, pluginName, config).(Parser)
+func GetParser(pluginName string, config plugin.DefaultConfig) Parser {
+	return plugin.Get(reflect.TypeOf((*Parser)(nil)).Elem(), config).(Parser)
 }
 
 func init() {
-	plugin.AddPluginCategory(ParserCategory)
+	plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem(), nil, nil, nil)
 }
diff --git a/plugins/parser/example/parser.go b/plugins/parser/example/parser.go
deleted file mode 100644
index 839956f..0000000
--- a/plugins/parser/example/parser.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoParser struct {
-	a string
-}
-
-type demoParser2 struct {
-	a string
-}
-
-type demoParser3 struct {
-	a string
-}
-
-func (d *demoParser) Description() string {
-	panic("implement me")
-}
-
-func (d *demoParser) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoParser) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d *demoParser) ParseStr(str string) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoParser2) Description() string {
-	panic("implement me")
-}
-
-func (d demoParser2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoParser2) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
-
-func (d demoParser2) ParseStr(str string) ([]event.SerializableEvent, error) {
-	panic("implement me")
-}
diff --git a/plugins/parser/example/parser_test.go b/plugins/parser/example/parser_test.go
deleted file mode 100644
index b577f7d..0000000
--- a/plugins/parser/example/parser_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/parser/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoParser",
-			args: &demoParser{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoParser2",
-			args: demoParser2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoParser3",
-			args: demoParser3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetParser(name, config)
-}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index 9b95156..f6d4d69 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -28,6 +28,9 @@ import (
 type Queue interface {
 	plugin.Plugin
 
+	// Prepare creates the queue.
+	Prepare() error
+
 	// Publisher get the only publisher for the current queue.
 	Publisher() QueuePublisher
 
@@ -35,27 +38,30 @@ type Queue interface {
 	Consumer() QueueConsumer
 
 	// Close would close the queue.
-	Close()
+	Close() error
+
+	// Ack a batch
+	Ack(startOffset int64, batchSize int) chan struct{}
 }
 
 // QueuePublisher is a plugin interface, that defines new queue publishers.
 type QueuePublisher interface {
 	// Enqueue push a inputEvent into the queue.
-	Enqueue(event *event.SerializableEvent) error
+	Enqueue(event event.SerializableEvent) error
 }
 
 // QueueConsumer is a plugin interface, that defines new queue consumers.
 type QueueConsumer interface {
 	// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
-	Dequeue() (event *event.SerializableEvent, offset int64, err error)
+	Dequeue() (event event.SerializableEvent, offset int64, err error)
 }
 
 var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
 
-func GetQueue(pluginName string, config map[string]interface{}) Queue {
-	return plugin.Get(QueueCategory, pluginName, config).(Queue)
+func GetQueue(config plugin.DefaultConfig) Queue {
+	return plugin.Get(QueueCategory, config).(Queue)
 }
 
 func init() {
-	plugin.AddPluginCategory(QueueCategory)
+	plugin.RegisterPluginCategory(QueueCategory, nil, nil, nil)
 }
diff --git a/plugins/queue/example/queue.go b/plugins/queue/example/queue.go
deleted file mode 100644
index 5a3055a..0000000
--- a/plugins/queue/example/queue.go
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-type demoQueue struct {
-	a string
-}
-
-type demoQueue2 struct {
-	a string
-}
-
-type demoQueue3 struct {
-	a string
-}
-
-func (d *demoQueue) Description() string {
-	panic("implement me")
-}
-
-func (d *demoQueue) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoQueue) Publisher() api.QueuePublisher {
-	panic("implement me")
-}
-
-func (d *demoQueue) Consumer() api.QueueConsumer {
-	panic("implement me")
-}
-
-func (d *demoQueue) Close() {
-	panic("implement me")
-}
-
-func (d demoQueue2) Description() string {
-	panic("implement me")
-}
-
-func (d demoQueue2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoQueue2) Publisher() api.QueuePublisher {
-	panic("implement me")
-}
-
-func (d demoQueue2) Consumer() api.QueueConsumer {
-	panic("implement me")
-}
-
-func (d demoQueue2) Close() {
-	panic("implement me")
-}
diff --git a/plugins/queue/example/queue_test.go b/plugins/queue/example/queue_test.go
deleted file mode 100644
index 0afd20d..0000000
--- a/plugins/queue/example/queue_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
-	"testing"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
-	"github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-func Test_Register(t *testing.T) {
-	tests := []struct {
-		name  string
-		args  interface{}
-		panic bool
-	}{
-		{
-			name: "demoQueue",
-			args: &demoQueue{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoQueue2",
-			args: demoQueue2{
-				a: "s",
-			},
-			panic: false,
-		},
-		{
-			name: "demoQueue3",
-			args: demoQueue3{
-				a: "s",
-			},
-			panic: true,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			plugin.RegisterPlugin(tt.name, tt.args)
-			assertPanic(t, tt.name, nil, tt.panic)
-		})
-	}
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
-	defer func() {
-		if r := recover(); r != nil && !existPanic {
-			t.Errorf("the plugin %s is not pass", name)
-		}
-	}()
-	api.GetQueue(name, config)
-}