You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/08 11:13:53 UTC
[skywalking-satellite] 01/01: add satellite main structure
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch boot-framework
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit c0939b1f361f071c4fe4ba668dfca0264397acfc
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Dec 8 19:13:07 2020 +0800
add satellite main structure
---
main.go => cmd/command.go | 33 ++-
main.go => cmd/main.go | 22 +-
configs/{config.yaml => satellite_config.yaml} | 61 ++--
dist/LICENSE | 4 +-
dist/licenses/LICENSE-cli | 21 ++
dist/licenses/LICENSE-viper | 21 ++
go.mod | 6 +-
go.sum | 306 +++++++++++++++++++++
main.go => internal/pkg/constant/constant.go | 15 +-
internal/pkg/event/event.go | 8 +-
internal/pkg/{logger => log}/log.go | 92 +++++--
internal/pkg/{logger => log}/log_test.go | 44 ++-
internal/pkg/plugin/default_plugin.go | 76 +++++
main.go => internal/pkg/plugin/definition.go | 19 +-
internal/pkg/plugin/plugin_test.go | 101 +++++++
internal/pkg/plugin/registry.go | 65 +++--
main.go => internal/pkg/test/path.go | 18 +-
internal/satellite/boot/boot.go | 105 +++++++
.../satellite/config/clientmanager_config.go | 34 +--
.../satellite/config/gatherer_config.go | 44 +--
internal/satellite/config/loader.go | 68 +++++
internal/satellite/config/loader_test.go | 133 +++++++++
.../satellite/config/processor_config.go | 23 +-
internal/satellite/config/satellite_config.go | 51 ++++
.../satellite/config/sender_config.go | 36 ++-
internal/satellite/module/buffer/buffer.go | 94 +++++++
.../satellite/module/buffer/buffer_test.go | 65 ++---
internal/satellite/module/clientmanager.go | 182 ++++++++++++
internal/satellite/module/definition.go | 98 +++++++
internal/satellite/module/gatherer.go | 112 ++++++++
internal/satellite/module/processor.go | 115 ++++++++
internal/satellite/module/sender.go | 181 ++++++++++++
internal/pkg/plugin/define.go => plugins/README.md | 28 +-
plugins/client/api/client.go | 19 +-
plugins/client/example/client.go | 67 -----
plugins/collector/api/collector.go | 17 +-
plugins/collector/example/collector.go | 70 -----
plugins/collector/example/collector_test.go | 71 -----
plugins/fallbacker/api/fallbacker.go | 13 +-
plugins/fallbacker/example/fallbacker.go | 57 ----
plugins/fallbacker/example/fallbacker_test.go | 71 -----
plugins/fallbacker/timer/timer_fallbacker.go | 62 +++++
plugins/filter/api/filter.go | 10 +-
plugins/filter/example/filter_test.go | 71 -----
plugins/forwarder/api/forwarder.go | 27 +-
plugins/forwarder/example/forwarder.go | 62 -----
plugins/forwarder/example/forwarder_test.go | 71 -----
plugins/parser/api/parser.go | 8 +-
plugins/parser/example/parser.go | 62 -----
plugins/parser/example/parser_test.go | 71 -----
plugins/queue/api/queue.go | 18 +-
plugins/queue/example/queue.go | 72 -----
plugins/queue/example/queue_test.go | 71 -----
53 files changed, 2171 insertions(+), 1100 deletions(-)
diff --git a/main.go b/cmd/command.go
similarity index 54%
copy from main.go
copy to cmd/command.go
index 94fbc40..0457071 100644
--- a/main.go
+++ b/cmd/command.go
@@ -17,6 +17,35 @@
package main
-func main() {
- print("OK")
+import (
+ "github.com/urfave/cli/v2"
+
+ "github.com/apache/skywalking-satellite/internal/satellite/boot"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+var (
+ cmdStart = cli.Command{
+ Name: "start",
+ Usage: "start satellite",
+ Flags: []cli.Flag{
+ &cli.StringFlag{
+ Name: "config, c",
+ Usage: "Load configuration from `FILE`",
+ EnvVars: []string{"MOSN_CONFIG"},
+ Value: "configs/satellite_config.yaml",
+ },
+ },
+ Action: func(c *cli.Context) error {
+
+ cfg := loadConfig(c)
+ return boot.Start(cfg)
+ },
+ }
+)
+
+func loadConfig(c *cli.Context) *config.SatelliteConfig {
+ configPath := c.String("config")
+ cfg := config.Load(configPath)
+ return cfg
}
diff --git a/main.go b/cmd/main.go
similarity index 61%
copy from main.go
copy to cmd/main.go
index 94fbc40..8c2d809 100644
--- a/main.go
+++ b/cmd/main.go
@@ -17,6 +17,26 @@
package main
+import (
+ "os"
+ "time"
+
+ "github.com/urfave/cli/v2"
+)
+
+// Version will be initialized when building
+var Version string = "lastest"
+
func main() {
- print("OK")
+ app := cli.NewApp()
+ app.Name = "SkyWalking-Satellite"
+ app.Version = Version
+ app.Compiled = time.Now()
+ app.Usage = "Satellite is for collecting APM data."
+ app.Description = "A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs."
+ app.Commands = []*cli.Command{
+ &cmdStart,
+ }
+ app.Action = cli.ShowAppHelp
+ _ = app.Run(os.Args)
}
diff --git a/configs/config.yaml b/configs/satellite_config.yaml
similarity index 54%
rename from configs/config.yaml
rename to configs/satellite_config.yaml
index 232aecb..68caab2 100644
--- a/configs/config.yaml
+++ b/configs/satellite_config.yaml
@@ -15,34 +15,35 @@
# limitations under the License.
#
-Gatherer:
- - name: segment-receiver
- type: segment-receiver
- config:
- key: value
- key2: value2
- queue:
- type: mmap-queue
- config:
+logger:
+ log_pattern: "%time [%level][%field] - %msg"
+ time_pattern: "2006-01-02 15:04:05.001"
+ level: "info"
+
+agents:
+ - common_config:
+ namespace: namespace1
+ gatherer:
+ collector:
+ plugin_name: segment-receiver
+ k: v
+ queue:
+ plugin_name: mmap-queue
key: value
- key2: value2
- processor: processor1
-Processor:
- - name: processor1
- filters:
- - filtername1
- - filtername2
- - filtername3
-Sender:
- client:
- name: gRPC-client
- type: gRPC
- config:
- key1: value1
- key2: value2
- forwarders:
- - type: segment-forwarder
- eventType: segment
- config:
- key1: value1
- key2: value2
\ No newline at end of file
+ processor:
+ filters:
+ - plugin_name: filtertype1
+ key: value
+ sender:
+ flush_time: 5000
+ max_buffer_size: 100
+ forwarders:
+ - plugin_name: segment-forwarder
+ key: value
+ - common_config:
+ namespace: sharing
+ client_manager:
+ retry_interval: 5000
+ client:
+ plugin_name: "grpc"
+ k: v
diff --git a/dist/LICENSE b/dist/LICENSE
index 2ab0d2f..618fe55 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -231,4 +231,6 @@ MIT licenses
The following components are provided under the MIT License. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.
- sirupsen (logrus) 1.7.0: https://github.com/sirupsen/logrus MIT
+ sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
+ spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
+ urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
diff --git a/dist/licenses/LICENSE-cli b/dist/licenses/LICENSE-cli
new file mode 100644
index 0000000..17356d3
--- /dev/null
+++ b/dist/licenses/LICENSE-cli
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Jeremy Saenz & Contributors
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/dist/licenses/LICENSE-viper b/dist/licenses/LICENSE-viper
new file mode 100644
index 0000000..4527efb
--- /dev/null
+++ b/dist/licenses/LICENSE-viper
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Steve Francia
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 71fd69d..438ceff 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,8 @@ module github.com/apache/skywalking-satellite
go 1.14
-require github.com/sirupsen/logrus v1.7.0
+require (
+ github.com/sirupsen/logrus v1.7.0
+ github.com/spf13/viper v1.7.1
+ github.com/urfave/cli/v2 v2.3.0
+)
diff --git a/go.sum b/go.sum
index 4d74a1e..89357d1 100644
--- a/go.sum
+++ b/go.sum
@@ -1,10 +1,316 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
+cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
+cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
+cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
+cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
+cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
+cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
+cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
+dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
+github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
+github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
+github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
+github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
+github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
+github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
+github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
+github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
+github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
+github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
+github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
+github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
+github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
+github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
+github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
+github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
+github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
+github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
+github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
+github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
+github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
+github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
+github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
+github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
+github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
+github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
+github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
+github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
+github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
+github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
+github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
+github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
+github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
+github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
+github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
+github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
+github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
+github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
+github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
+github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
+github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
+go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
+golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
+golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
+golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
+golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
+golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
+golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
+golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
+google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
+google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
+google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
+google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
+gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
+rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
diff --git a/main.go b/internal/pkg/constant/constant.go
similarity index 77%
copy from main.go
copy to internal/pkg/constant/constant.go
index 94fbc40..f1ce73b 100644
--- a/main.go
+++ b/internal/pkg/constant/constant.go
@@ -15,8 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package main
+package constant
-func main() {
- print("OK")
-}
+const (
+ // project name
+ ProjectName = "skywalking-satellite"
+
+ // Module names
+ ClientManagerModule = "client-manager"
+ GathererModule = "gatherer"
+ ProcessorModule = "processor"
+ SenderModule = "sender"
+)
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index c8ff0f2..20c6187 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -73,20 +73,18 @@ type BatchEvents []Event
// OutputEventContext is a container to store the output context.
type OutputEventContext struct {
- context map[string]Event
+ Context map[string]Event
Offset int64
}
// Put puts the incoming event into the context when the event is a remote event.
func (c *OutputEventContext) Put(event Event) {
- if event.IsRemote() {
- c.context[event.Name()] = event
- }
+ c.Context[event.Name()] = event
}
// Get returns a event in the context. When the eventName does not exist, a error would be returned.
func (c *OutputEventContext) Get(eventName string) (Event, error) {
- e, ok := c.context[eventName]
+ e, ok := c.Context[eventName]
if !ok {
err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
return nil, err
diff --git a/internal/pkg/logger/log.go b/internal/pkg/log/log.go
similarity index 51%
rename from internal/pkg/logger/log.go
rename to internal/pkg/log/log.go
index a62128d..9918dd4 100644
--- a/internal/pkg/logger/log.go
+++ b/internal/pkg/log/log.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package logger
+package log
import (
"fmt"
@@ -32,57 +32,97 @@ const (
defaultTimePattern = "2006-01-02 15:04:05.001"
)
+// LoggerConfig initializes the global logger config.
+type LoggerConfig struct {
+ LogPattern string `mapstructure:"log_pattern"`
+ TimePattern string `mapstructure:"time_pattern"`
+ Level string `mapstructure:"level"`
+}
+
+// FormatOption is a function to set formatter config.
+type FormatOption func(f *formatter)
+
+// FormatOption is a function to set logger config.
+type ConfigOption func(l *logrus.Logger)
+
type formatter struct {
logPattern string
timePattern string
}
-// Option is a function to set formatter config.
-type Option func(f *formatter)
-
-// Log is the global logger.
-var Log *logrus.Logger
+// Logger is the global logger.
+var Logger *logrus.Logger
var once sync.Once
-// The Log init method, keep Log as a singleton.
-func Init(opts ...Option) {
+func Init(cfg *LoggerConfig) {
once.Do(func() {
- if Log == nil {
- Log = logrus.New()
- }
- Log.SetOutput(os.Stdout)
- Log.SetLevel(logrus.InfoLevel)
- f := &formatter{}
- for _, opt := range opts {
- opt(f)
- }
- if f.logPattern == "" {
- f.logPattern = defaultLogPattern
+ var configOpts []ConfigOption
+ var formatOpts []FormatOption
+
+ if cfg.Level != "" {
+ configOpts = append(configOpts, SetLevel(cfg.Level))
+ } else {
+ configOpts = append(configOpts, SetLevel(logrus.InfoLevel.String()))
}
- if f.timePattern == "" {
- f.timePattern = defaultTimePattern
+ if cfg.TimePattern != "" {
+ formatOpts = append(formatOpts, SetTimePattern(cfg.TimePattern))
+ } else {
+ formatOpts = append(formatOpts, SetTimePattern(defaultTimePattern))
}
- if !strings.Contains(f.logPattern, "\n") {
- f.logPattern += "\n"
+ if cfg.LogPattern != "" {
+ formatOpts = append(formatOpts, SetLogPattern(cfg.LogPattern))
+ } else {
+ formatOpts = append(formatOpts, SetLogPattern(defaultLogPattern))
}
- Log.SetFormatter(f)
+ initBySettings(configOpts, formatOpts)
})
}
+// The Logger init method, keep Logger as a singleton.
+func initBySettings(configOpts []ConfigOption, formatOpts []FormatOption) {
+ // Default Logger.
+ Logger = logrus.New()
+ Logger.SetOutput(os.Stdout)
+ for _, opt := range configOpts {
+ opt(Logger)
+ }
+ // Default formatter.
+ f := &formatter{}
+ for _, opt := range formatOpts {
+ opt(f)
+ }
+ if !strings.Contains(f.logPattern, "\n") {
+ f.logPattern += "\n"
+ }
+ Logger.SetFormatter(f)
+}
+
// Set the log pattern in formatter.
-func SetLogPattern(logPattern string) Option {
+func SetLogPattern(logPattern string) FormatOption {
return func(f *formatter) {
f.logPattern = logPattern
}
}
// Set the time pattern in formatter.
-func SetTimePattern(timePattern string) Option {
+func SetTimePattern(timePattern string) FormatOption {
return func(f *formatter) {
f.timePattern = timePattern
}
}
+// Set the time pattern in formatter.
+func SetLevel(levelStr string) ConfigOption {
+ return func(logger *logrus.Logger) {
+ level, err := logrus.ParseLevel(levelStr)
+ if err != nil {
+ fmt.Printf("logger level does not exist: %s, level would be set info", levelStr)
+ level = logrus.InfoLevel
+ }
+ logger.SetLevel(level)
+ }
+}
+
// Format supports unified log output format that has %time, %level, %field and %msg.
func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
output := f.logPattern
diff --git a/internal/pkg/logger/log_test.go b/internal/pkg/log/log_test.go
similarity index 66%
rename from internal/pkg/logger/log_test.go
rename to internal/pkg/log/log_test.go
index d8ad895..2c690d8 100644
--- a/internal/pkg/logger/log_test.go
+++ b/internal/pkg/log/log_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package logger
+package log
import (
"reflect"
@@ -26,7 +26,13 @@ import (
)
func TestFormatter_Format(t *testing.T) {
- Init(SetLogPattern("[%time][%level][%field] - %msg"), SetTimePattern("2006-01-02 15:04:05,001"))
+ initBySettings([]ConfigOption{
+ SetLevel("warn"),
+ },
+ []FormatOption{
+ SetLogPattern("[%time][%level][%field] - %msg"),
+ SetTimePattern("2006-01-02 15:04:05,001"),
+ })
type args struct {
entry *logrus.Entry
}
@@ -41,7 +47,7 @@ func TestFormatter_Format(t *testing.T) {
want: []byte("[2020-12-12 12:12:12,012][trace][] - entry1\n"),
args: args{
entry: func() *logrus.Entry {
- entry := Log.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+ entry := Logger.WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
entry.Level = logrus.TraceLevel
entry.Message = "entry1"
return entry
@@ -53,7 +59,7 @@ func TestFormatter_Format(t *testing.T) {
want: []byte("[2020-12-12 12:12:12,012][warning][a=b] - entry2\n"),
args: args{
entry: func() *logrus.Entry {
- entry := Log.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
+ entry := Logger.WithField("a", "b").WithTime(time.Date(2020, 12, 12, 12, 12, 12, 12, time.Local).Local())
entry.Level = logrus.WarnLevel
entry.Message = "entry2"
return entry
@@ -64,7 +70,7 @@ func TestFormatter_Format(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- f := Log.Formatter
+ f := Logger.Formatter
got, _ := f.Format(tt.args.entry)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Format() got = %s, want %s", got, tt.want)
@@ -72,3 +78,31 @@ func TestFormatter_Format(t *testing.T) {
})
}
}
+
+func TestSetLevel(t *testing.T) {
+ type args struct {
+ opts ConfigOption
+ }
+ tests := []struct {
+ name string
+ args args
+ want logrus.Level
+ }{
+ {
+ name: "info",
+ args: args{
+ opts: SetLevel("warn"),
+ },
+ want: logrus.WarnLevel,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ logger := logrus.New()
+ tt.args.opts(logger)
+ if logger.Level != tt.want {
+ t.Errorf("SetLevel() got = %s, want %s", logger.Level, tt.want)
+ }
+ })
+ }
+}
diff --git a/internal/pkg/plugin/default_plugin.go b/internal/pkg/plugin/default_plugin.go
new file mode 100644
index 0000000..cab5adc
--- /dev/null
+++ b/internal/pkg/plugin/default_plugin.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/spf13/viper"
+)
+
+// DefaultPluginNameField is a required field in DefaultConfig.
+const DefaultPluginNameField = "plugin_name"
+
+// DefaultInitializingPlugin defines the plugins initialized by defaultInitializing.
+type DefaultInitializingPlugin interface {
+ Plugin
+ // DefaultConfig returns the default config, that is a YAML pattern.
+ DefaultConfig() string
+}
+
+// DefaultConfig is used to initialize the DefaultInitializingPlugin.
+type DefaultConfig map[string]interface{}
+
+// defaultNameFinder is used to get the plugin name in DefaultConfig.
+func defaultNameFinder(cfg interface{}) string {
+ c, ok := cfg.(DefaultConfig)
+ if !ok {
+ panic(fmt.Errorf("defaultNameFinder only supports DefaultConfig"))
+ }
+ name, ok := c[DefaultPluginNameField]
+ if !ok {
+ panic(fmt.Errorf("%s is requeired in DefaultConfig", DefaultPluginNameField))
+ }
+ return name.(string)
+}
+
+// defaultInitializing initialize the fields by fields mapping.
+func defaultInitializing(plugin Plugin, cfg interface{}) {
+ c, ok := cfg.(DefaultConfig)
+ if !ok {
+ panic(fmt.Errorf("%s plugin is a DefaultInitializingPlugin, but the type of configuration is illegal", plugin.Name()))
+ }
+ v := viper.New()
+ v.SetConfigType("yaml")
+ p := plugin.(DefaultInitializingPlugin)
+ if p.DefaultConfig() != "" {
+ if err := v.ReadConfig(strings.NewReader(p.DefaultConfig())); err != nil {
+ panic(fmt.Errorf("cannot read default config in the plugin: %s, the error is %v", plugin.Name(), err))
+ }
+ }
+ if err := v.MergeConfigMap(c); err != nil {
+ panic(fmt.Errorf("%s plugin cannot merge the custom configuration, the error is %v", plugin.Name(), err))
+ }
+ if err := v.Unmarshal(plugin); err != nil {
+ panic(fmt.Errorf("cannot inject the config to the %s plugin, the error is %v", plugin.Name(), err))
+ }
+}
+
+// defaultCallBack does nothing.
+func defaultCallBack(plugin Plugin) {}
diff --git a/main.go b/internal/pkg/plugin/definition.go
similarity index 58%
copy from main.go
copy to internal/pkg/plugin/definition.go
index 94fbc40..bf73689 100644
--- a/main.go
+++ b/internal/pkg/plugin/definition.go
@@ -15,8 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-package main
+package plugin
-func main() {
- print("OK")
+// Plugin defines the plugin model in Satellite.
+type Plugin interface {
+ // Name returns the name of the specific plugin.
+ Name() string
+ // Description returns the description of the specific plugin.
+ Description() string
}
+
+// NameFinderFunc is used to get the plugin name from different plugin configs.
+type NameFinderFunc func(config interface{}) string
+
+// InitializingFunc is used to initialize the specific plugin.
+type InitializingFunc func(plugin Plugin, config interface{})
+
+// CallbackFunc would be invoked after initializing.
+type CallbackFunc func(plugin Plugin)
diff --git a/internal/pkg/plugin/plugin_test.go b/internal/pkg/plugin/plugin_test.go
new file mode 100644
index 0000000..a14eb3b
--- /dev/null
+++ b/internal/pkg/plugin/plugin_test.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plugin
+
+import (
+ "reflect"
+ "testing"
+)
+
+type DemoCategory interface {
+ DefaultInitializingPlugin
+ Say() string
+}
+
+type DemoPlugin struct {
+ Organization string `mapstructure:"organization"`
+ Project string `mapstructure:"project"`
+}
+
+func (d *DemoPlugin) Say() string {
+ return d.Organization + ":" + d.Project
+}
+
+func (d *DemoPlugin) Name() string {
+ return "demoplugin"
+}
+
+func (d *DemoPlugin) Description() string {
+ return "this is just a demo"
+}
+
+func (d *DemoPlugin) DefaultConfig() string {
+ return `
+organization: "ASF"
+project: "skywalking-satellite"
+`
+}
+
+func TestPlugin(t *testing.T) {
+ tests := []struct {
+ name string
+ args DefaultConfig
+ want *DemoPlugin
+ }{
+ {
+ name: "test1",
+ args: DefaultConfig{
+ "plugin_name": "demoplugin",
+ "organization": "CNCF",
+ "project": "Fluentd",
+ },
+ want: &DemoPlugin{
+ Organization: "CNCF",
+ Project: "Fluentd",
+ },
+ },
+ {
+ name: "demoplugin",
+ args: DefaultConfig{
+ "plugin_name": "demoplugin",
+ },
+ want: &DemoPlugin{
+ Organization: "ASF",
+ Project: "skywalking-satellite",
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ defer func() {
+ if i := recover(); i != nil {
+ t.Errorf("the plugin %s is not exist", "demoplugin")
+ }
+ }()
+ plugin := Get(reflect.TypeOf((*DemoCategory)(nil)).Elem(), tt.args)
+ if !reflect.DeepEqual(plugin, tt.want) {
+ t.Errorf("Format() got = %v, want %v", plugin, tt.want)
+ }
+ })
+ }
+}
+
+func init() {
+ RegisterPluginCategory(reflect.TypeOf((*DemoCategory)(nil)).Elem(), nil, nil, nil)
+ RegisterPlugin(&DemoPlugin{})
+}
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index 4731d36..9059d66 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -23,57 +23,86 @@ import (
"sync"
)
-// All plugins is wrote in ./plugins dir. The plugin type would be as the next level dirs,
-// such as collector, client, or queue. And the 3rd level is the plugin name, that is also
-// used as key in pluginRegistry.
-
-// reg is the global plugin registry
+// the global plugin registry
var (
- reg map[reflect.Type]map[string]reflect.Value
- lock sync.Mutex
+ lock sync.Mutex
+ reg map[reflect.Type]map[string]reflect.Value
+ initFuncReg map[reflect.Type]InitializingFunc
+ callbackFuncReg map[reflect.Type]CallbackFunc
+ nameFinderFuncReg map[reflect.Type]NameFinderFunc
)
func init() {
reg = make(map[reflect.Type]map[string]reflect.Value)
+ initFuncReg = make(map[reflect.Type]InitializingFunc)
+ callbackFuncReg = make(map[reflect.Type]CallbackFunc)
+ nameFinderFuncReg = make(map[reflect.Type]NameFinderFunc)
}
-// Add new plugin category. The different plugin category could have same plugin names.
-func AddPluginCategory(pluginCategory reflect.Type) {
+// RegisterCategory register new plugin category with default InitializingFunc.
+// required:
+// pluginCategory: the plugin interface type.
+// Optional:
+// n: the plugin name finder,and the default value is defaultNameFinder
+// i, the plugin initializer, and the default value is defaultInitializing
+// c, the plugin initializer callback func, and the default value is defaultCallBack
+func RegisterPluginCategory(pluginCategory reflect.Type, n NameFinderFunc, i InitializingFunc, c CallbackFunc) {
lock.Lock()
defer lock.Unlock()
reg[pluginCategory] = map[string]reflect.Value{}
+
+ if n == nil {
+ nameFinderFuncReg[pluginCategory] = defaultNameFinder
+ } else {
+ nameFinderFuncReg[pluginCategory] = n
+ }
+ if i == nil {
+ initFuncReg[pluginCategory] = defaultInitializing
+ } else {
+ initFuncReg[pluginCategory] = i
+ }
+ if c == nil {
+ callbackFuncReg[pluginCategory] = defaultCallBack
+ } else {
+ callbackFuncReg[pluginCategory] = c
+ }
}
// RegisterPlugin registers the pluginType as plugin.
// If the plugin is a pointer receiver, please pass a pointer. Otherwise, please pass a value.
-func RegisterPlugin(pluginName string, plugin interface{}) {
+func RegisterPlugin(plugin Plugin) {
lock.Lock()
defer lock.Unlock()
v := reflect.ValueOf(plugin)
success := false
for pCategory, pReg := range reg {
if v.Type().Implements(pCategory) {
- pReg[pluginName] = v
- fmt.Printf("register %s %s successfully ", pluginName, v.Type().String())
+ pReg[plugin.Name()] = v
+ fmt.Printf("register %s %s successfully ", plugin.Name(), v.Type().String())
success = true
}
}
if !success {
- fmt.Printf("this type of %s is not supported to register : %s", pluginName, v.Type().String())
+ fmt.Printf("this type of %s is not supported to register : %s", plugin.Name(), v.Type().String())
}
}
-// Get the specific plugin according to the pluginCategory and pluginName.
-func Get(pluginCategory reflect.Type, pluginName string, config map[string]interface{}) Plugin {
- value, ok := reg[pluginCategory][pluginName]
+// Get an initialized specific plugin according to the pluginCategory and pluginName.
+func Get(category reflect.Type, cfg interface{}) Plugin {
+ lock.Lock()
+ defer lock.Unlock()
+ pluginName := nameFinderFuncReg[category](cfg)
+ value, ok := reg[category][pluginName]
if !ok {
- panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, pluginCategory))
+ panic(fmt.Errorf("cannot find %s plugin, and the category of plugin is %s", pluginName, category))
}
t := value.Type()
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
+
plugin := reflect.New(t).Interface().(Plugin)
- plugin.InitPlugin(config)
+ initFuncReg[category](plugin, cfg)
+ callbackFuncReg[category](plugin)
return plugin
}
diff --git a/main.go b/internal/pkg/test/path.go
similarity index 69%
copy from main.go
copy to internal/pkg/test/path.go
index 94fbc40..cf5adcc 100644
--- a/main.go
+++ b/internal/pkg/test/path.go
@@ -15,8 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-package main
+package test
-func main() {
- print("OK")
+import (
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+)
+
+func FindRootPath() (string, error) {
+ pwd, err := os.Getwd()
+ if err != nil {
+ return "", fmt.Errorf("could not find the project root path: %v", err)
+ }
+ return pwd[0 : strings.Index(pwd, constant.ProjectName)+len(constant.ProjectName)], nil
}
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
new file mode 100644
index 0000000..b8ae2db
--- /dev/null
+++ b/internal/satellite/boot/boot.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package boot
+
+import (
+ "context"
+ "os"
+ "sync"
+ "syscall"
+
+ "os/signal"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+ "github.com/apache/skywalking-satellite/internal/satellite/module"
+)
+
+// Start Satellite.
+func Start(cfg *config.SatelliteConfig) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ addShutdownListener(cancel)
+ initLogger(cfg)
+ initModules(cfg)
+ if err := prepareModules(); err != nil {
+ return err
+ }
+ bootModules(ctx)
+ return nil
+}
+
+// addShutdownListener add a close signal listener.
+func addShutdownListener(cancel context.CancelFunc) {
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
+ go func() {
+ <-signals
+ cancel()
+ }()
+}
+
+// initLogger init the global logger.
+func initLogger(cfg *config.SatelliteConfig) {
+ log.Init(cfg.Logger)
+}
+
+// initModules init the modules and register the modules to the module container.
+func initModules(cfg *config.SatelliteConfig) {
+ log.Logger.Infof("satellite is initializing...")
+ for _, agentConfig := range cfg.Agents {
+ if agentConfig.ClientManager != nil {
+ module.NewModuleService(agentConfig.ClientManager)
+ }
+ if agentConfig.Sender != nil {
+ module.NewModuleService(agentConfig.Sender)
+ }
+ if agentConfig.Processor != nil {
+ module.NewModuleService(agentConfig.Processor)
+ }
+ if agentConfig.Gatherer != nil {
+ module.NewModuleService(agentConfig.Gatherer)
+ }
+ }
+}
+
+// prepareModules makes that all modules are in a bootable state.
+func prepareModules() error {
+ log.Logger.Infof("satellite is prepare to start...")
+ for _, service := range module.GetModuleContainer() {
+ if err := service.Prepare(); err != nil {
+ log.Logger.Errorf("%s module of %s namespace is error in preparing stage, error is %v", service.Name(), service.Config().NameSpace(), err)
+ return err
+ }
+ }
+ return nil
+}
+
+// prepareModules boot all modules.
+func bootModules(ctx context.Context) {
+ log.Logger.Infof("satellite is starting...")
+ var wg sync.WaitGroup
+ for _, service := range module.GetModuleContainer() {
+ service := service
+ go func() {
+ defer wg.Done()
+ wg.Add(1)
+ service.Boot(ctx)
+ }()
+ }
+ wg.Wait()
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/clientmanager_config.go
similarity index 58%
copy from plugins/client/api/client.go
copy to internal/satellite/config/clientmanager_config.go
index 51d2b65..3f8bdfa 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/clientmanager_config.go
@@ -15,33 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-package api
+package config
import (
- "reflect"
-
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
- plugin.Plugin
+// Config defines the initialization params for ClientManager.
+type ClientManagerConfig struct {
+ // common config
+ common ModuleCommonConfig
- // Prepare would make connection with outer service.
- Prepare()
- // GetConnection returns the connected client to publish events.
- GetConnectedClient() interface{}
- // Close the connection with outer service.
- Close()
-}
+ // plugins config
+ ClientConfig plugin.DefaultConfig `mapstructure:"client"` // the client plugin config
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+ // self config
+ RetryInterval int64 `mapstructure:"retry_interval"` // the client retry interval when disconnected.
+}
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
- return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (c *ClientManagerConfig) ModuleName() string {
+ return constant.ClientManagerModule
}
-func init() {
- plugin.AddPluginCategory(ClientCategory)
+func (c *ClientManagerConfig) NameSpace() string {
+ return c.common.RunningNamespace
}
diff --git a/plugins/filter/example/filter.go b/internal/satellite/config/gatherer_config.go
similarity index 56%
rename from plugins/filter/example/filter.go
rename to internal/satellite/config/gatherer_config.go
index 14abb69..ded14e1 100644
--- a/plugins/filter/example/filter.go
+++ b/internal/satellite/config/gatherer_config.go
@@ -15,40 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-package example
+package config
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
+import (
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
-type demoFilter struct {
- a string
-}
-
-type demoFilter2 struct {
- a string
-}
-
-type demoFilter3 struct {
- a string
-}
-
-func (d *demoFilter) Description() string {
- panic("implement me")
-}
-
-func (d *demoFilter) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFilter) Process(in event.Event) event.Event {
- panic("implement me")
-}
+type GathererConfig struct {
+ // common config
+ common ModuleCommonConfig
-func (d demoFilter2) Description() string {
- panic("implement me")
+ // plugins config
+ CollectorConfig plugin.DefaultConfig `mapstructure:"collector"` // collector plugin config
+ QueueConfig plugin.DefaultConfig `mapstructure:"queue"` // queue plugin config
}
-func (d demoFilter2) InitPlugin(config map[string]interface{}) {
+func (g *GathererConfig) ModuleName() string {
+ return constant.GathererModule
}
-func (d demoFilter2) Process(in event.Event) event.Event {
- panic("implement me")
+func (g *GathererConfig) NameSpace() string {
+ return g.common.RunningNamespace
}
diff --git a/internal/satellite/config/loader.go b/internal/satellite/config/loader.go
new file mode 100644
index 0000000..c9b0dac
--- /dev/null
+++ b/internal/satellite/config/loader.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+
+ "io/ioutil"
+ "path/filepath"
+
+ "github.com/spf13/viper"
+)
+
+var (
+ cfgLock sync.Mutex
+)
+
+// Load SatelliteConfig. The func could not use global logger of Satellite, because it is executed before logger initialization.
+func Load(configPath string) *SatelliteConfig {
+ cfgLock.Lock()
+ defer cfgLock.Unlock()
+ fmt.Printf("load config from : %s\n", configPath)
+ cfg, err := load(configPath)
+ if err != nil {
+ panic(fmt.Errorf("could not load config form the path: %s, the error is :%v", configPath, err))
+ } else {
+ return cfg
+ }
+}
+
+// load SatelliteConfig from the yaml config.
+func load(configPath string) (*SatelliteConfig, error) {
+ absolutePath, err := filepath.Abs(configPath)
+ if err != nil {
+ return nil, err
+ }
+ content, err := ioutil.ReadFile(absolutePath)
+ if err != nil {
+ return nil, err
+ }
+ v := viper.New()
+ v.SetConfigType("yaml")
+ cfg := SatelliteConfig{}
+ if err := v.ReadConfig(bytes.NewReader(content)); err != nil {
+ return nil, err
+ }
+ if err := v.Unmarshal(&cfg); err != nil {
+ return nil, err
+ }
+ return &cfg, nil
+}
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
new file mode 100644
index 0000000..e31c7f2
--- /dev/null
+++ b/internal/satellite/config/loader_test.go
@@ -0,0 +1,133 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "encoding/json"
+ "reflect"
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/internal/pkg/test"
+)
+
+func TestLoad(t *testing.T) {
+ rootPath, err := test.FindRootPath()
+ if err != nil {
+ panic(err)
+ }
+ type args struct {
+ configPath string
+ }
+ tests := []struct {
+ name string
+ args args
+ want *SatelliteConfig
+ }{
+ {
+ name: "Legal configuration",
+ args: args{configPath: rootPath + "/configs/satellite_config.yaml"},
+ want: &SatelliteConfig{
+ Logger: &log.LoggerConfig{
+ LogPattern: "%time [%level][%field] - %msg",
+ TimePattern: "2006-01-02 15:04:05.001",
+ Level: "info",
+ },
+ Agents: []*AgentConfig{
+ {
+ ModuleCommonConfig: &ModuleCommonConfig{
+ RunningNamespace: "namespace1",
+ },
+
+ Gatherer: &GathererConfig{
+ CollectorConfig: plugin.DefaultConfig{
+ "plugin_name": "segment-receiver",
+ "k": "v",
+ },
+ QueueConfig: plugin.DefaultConfig{
+ "plugin_name": "mmap-queue",
+ "key": "value",
+ },
+ },
+ Processor: &ProcessorConfig{
+ FilterConfig: []plugin.DefaultConfig{
+ {
+ "plugin_name": "filtertype1",
+ "key": "value",
+ },
+ },
+ },
+ Sender: &SenderConfig{
+ FlushTime: 5000,
+ MaxBufferSize: 100,
+ ForwardersConfig: []plugin.DefaultConfig{
+ {
+ "plugin_name": "segment-forwarder",
+ "key": "value",
+ },
+ },
+ },
+ },
+ {
+ ModuleCommonConfig: &ModuleCommonConfig{
+ RunningNamespace: "sharing",
+ },
+ ClientManager: &ClientManagerConfig{
+ RetryInterval: 5000,
+ ClientConfig: plugin.DefaultConfig{
+ "plugin_name": "grpc",
+ "k": "v",
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c, err := load(tt.args.configPath)
+ if err != nil {
+ t.Fatalf("cannot load config: %v", err)
+ }
+ doJudgeEqual(t, c.Logger, tt.want.Logger)
+ doJudgeEqual(t, c.Agents[0].ModuleCommonConfig, tt.want.Agents[0].ModuleCommonConfig)
+ doJudgeEqual(t, c.Agents[0].Gatherer, tt.want.Agents[0].Gatherer)
+ doJudgeEqual(t, c.Agents[0].Processor, tt.want.Agents[0].Processor)
+ doJudgeEqual(t, c.Agents[0].Sender, tt.want.Agents[0].Sender)
+
+ doJudgeEqual(t, c.Agents[1].ModuleCommonConfig, tt.want.Agents[1].ModuleCommonConfig)
+ doJudgeEqual(t, c.Agents[1].ClientManager, tt.want.Agents[1].ClientManager)
+ })
+ }
+}
+
+func doJudgeEqual(t *testing.T, a, b interface{}) {
+ if !reflect.DeepEqual(a, b) {
+ ajson, err := json.Marshal(a)
+ if err != nil {
+ t.Fatalf("cannot do json format: %v", err)
+ }
+ bjson, err := json.Marshal(b)
+ if err != nil {
+ t.Fatalf("cannot do json format: %v", err)
+ }
+ t.Fatalf("config is not equal, got %s, want %s", ajson, bjson)
+ }
+}
diff --git a/main.go b/internal/satellite/config/processor_config.go
similarity index 62%
rename from main.go
rename to internal/satellite/config/processor_config.go
index 94fbc40..dd65844 100644
--- a/main.go
+++ b/internal/satellite/config/processor_config.go
@@ -15,8 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package main
+package config
-func main() {
- print("OK")
+import (
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+type ProcessorConfig struct {
+ // common config
+ common *ModuleCommonConfig
+
+ // plugins config
+ FilterConfig []plugin.DefaultConfig `mapstructure:"filters"` // filter plugins
+}
+
+func (p *ProcessorConfig) ModuleName() string {
+ return constant.ProcessorModule
+}
+
+func (p *ProcessorConfig) NameSpace() string {
+ return p.common.RunningNamespace
}
diff --git a/internal/satellite/config/satellite_config.go b/internal/satellite/config/satellite_config.go
new file mode 100644
index 0000000..9160cc6
--- /dev/null
+++ b/internal/satellite/config/satellite_config.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// SatelliteConfig is to initialize Satellite.
+type SatelliteConfig struct {
+ Logger *log.LoggerConfig `mapstructure:"logger"`
+ Agents []*AgentConfig `mapstructure:"agents"`
+}
+
+// AgentConfig initializes the different module in different namespace.
+type AgentConfig struct {
+ ModuleCommonConfig *ModuleCommonConfig `mapstructure:"common_config"`
+ ClientManager *ClientManagerConfig `mapstructure:"client_manager"`
+ Gatherer *GathererConfig `mapstructure:"gatherer"`
+ Processor *ProcessorConfig `mapstructure:"processor"`
+ Sender *SenderConfig `mapstructure:"sender"`
+}
+
+// ModuleConfig is an interface to define the initialization fields in ModuleService.
+type ModuleConfig interface {
+ // ModuleName returns the name of current module.
+ ModuleName() string
+ // NameSpace returns the current running namespace. Satellite support multi-agents running in one Satellite
+ // process, the namespace is a space concept to distinguish different agents.
+ NameSpace() string
+}
+
+// ModuleCommonConfig has some common fields of each ModuleConfig.
+type ModuleCommonConfig struct {
+ RunningNamespace string `mapstructure:"namespace"`
+}
diff --git a/plugins/client/api/client.go b/internal/satellite/config/sender_config.go
similarity index 52%
copy from plugins/client/api/client.go
copy to internal/satellite/config/sender_config.go
index 51d2b65..d2fa50d 100644
--- a/plugins/client/api/client.go
+++ b/internal/satellite/config/sender_config.go
@@ -15,33 +15,31 @@
// specific language governing permissions and limitations
// under the License.
-package api
+package config
import (
- "reflect"
-
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-// Client is a plugin interface, that defines new clients, such as gRPC client and Kafka client.
-type Client interface {
- plugin.Plugin
+type SenderConfig struct {
+ // common config
+ common ModuleCommonConfig
- // Prepare would make connection with outer service.
- Prepare()
- // GetConnection returns the connected client to publish events.
- GetConnectedClient() interface{}
- // Close the connection with outer service.
- Close()
-}
+ // plugins config
+ ForwardersConfig []plugin.DefaultConfig `mapstructure:"forwarders"` // forwarder plugins config
+ FallbackerConfig plugin.DefaultConfig `mapstructure:"fallbacker"` // fallbacker plugins config
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
+ // self config
+ MaxBufferSize int `mapstructure:"max_buffer_size"` // the max buffer capacity
+ MinFlushEvents int `mapstructure:"min_flush_events"` // the min flush events when receives a timer flush signal
+ FlushTime int `mapstructure:"flush_time"` // the period flush time
+}
-// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
- return plugin.Get(ClientCategory, pluginName, config).(Client)
+func (s *SenderConfig) ModuleName() string {
+ return constant.SenderModule
}
-func init() {
- plugin.AddPluginCategory(ClientCategory)
+func (s *SenderConfig) NameSpace() string {
+ return s.common.RunningNamespace
}
diff --git a/internal/satellite/module/buffer/buffer.go b/internal/satellite/module/buffer/buffer.go
new file mode 100644
index 0000000..ec3c59e
--- /dev/null
+++ b/internal/satellite/module/buffer/buffer.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+ "sync"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// BatchBuffer is a buffer to cache the input data in Sender.
+type BatchBuffer struct {
+ sync.Mutex // local
+ buf []*event.OutputEventContext // cache
+ first int64 // the first OutputEventContext offset
+ last int64 // the last OutputEventContext offset
+ size int // usage size
+ cap int // the max capacity
+}
+
+// NewBuffer crate a new BatchBuffer according to the capacity param.
+func NewBatchBuffer(capacity int) *BatchBuffer {
+ return &BatchBuffer{
+ buf: make([]*event.OutputEventContext, capacity),
+ first: 0,
+ last: 0,
+ size: 0,
+ cap: capacity,
+ }
+}
+
+// Buf returns the cached data in BatchBuffer.
+func (b *BatchBuffer) Buf() []*event.OutputEventContext {
+ b.Lock()
+ defer b.Unlock()
+ return b.buf
+}
+
+// First returns the first OutputEventContext offset.
+func (b *BatchBuffer) First() int64 {
+ b.Lock()
+ defer b.Unlock()
+ return b.first
+}
+
+// Len returns the usage size.
+func (b *BatchBuffer) Len() int {
+ b.Lock()
+ defer b.Unlock()
+ return b.size
+}
+
+// BatchSize returns the offset increment of the cached data.
+func (b *BatchBuffer) BatchSize() int {
+ b.Lock()
+ defer b.Unlock()
+ return int(b.last - b.first + 1)
+}
+
+// Add adds a new data input buffer.
+func (b *BatchBuffer) Add(data *event.OutputEventContext) {
+ b.Lock()
+ defer b.Unlock()
+ if b.size == b.cap {
+ log.Logger.Errorf("cannot add one item to the fulling BatchBuffer, the capacity is %d", b.cap)
+ return
+ } else if data.Offset <= 0 {
+ log.Logger.Errorf("cannot add one item to BatchBuffer because the input data is illegal, the offset is %d", data.Offset)
+ return
+ }
+ if b.size == 0 {
+ b.first = data.Offset
+ } else {
+ b.last = data.Offset
+ }
+ b.buf[b.size] = data
+ b.size++
+}
diff --git a/plugins/client/example/client_test.go b/internal/satellite/module/buffer/buffer_test.go
similarity index 54%
rename from plugins/client/example/client_test.go
rename to internal/satellite/module/buffer/buffer_test.go
index a9e355e..be9e9a8 100644
--- a/plugins/client/example/client_test.go
+++ b/internal/satellite/module/buffer/buffer_test.go
@@ -15,57 +15,58 @@
// specific language governing permissions and limitations
// under the License.
-package example
+package buffer
import (
"testing"
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/client/api"
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
)
-func Test_Register(t *testing.T) {
+func TestNewBuffer(t *testing.T) {
+ buffer := NewBatchBuffer(3)
tests := []struct {
- name string
- args interface{}
- panic bool
+ name string
+ args *event.OutputEventContext
+ want int
}{
{
- name: "demoClient",
- args: demoClient{
- a: "s",
- },
- panic: false,
+ name: "add-1",
+ args: &event.OutputEventContext{Offset: 1},
+ want: 1,
},
{
- name: "demoClient2",
- args: &demoClient2{
- a: "s",
- },
- panic: false,
+ name: "add-2",
+ args: &event.OutputEventContext{Offset: 2},
+ want: 2,
},
{
- name: "demoClient3",
- args: demoClient3{
- a: "s",
- },
- panic: true,
+ name: "add-3",
+ args: &event.OutputEventContext{Offset: 3},
+ want: 3,
+ },
+ {
+ name: "add-4",
+ args: &event.OutputEventContext{Offset: 4},
+ want: 3,
+ },
+ {
+ name: "add-5",
+ args: &event.OutputEventContext{Offset: 5},
+ want: 3,
},
}
-
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
+ buffer.Add(tt.args)
+ if got := buffer.Len(); got != tt.want {
+ t.Errorf("Buffer Len() = %v, want %v", got, tt.want)
+ }
})
}
}
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetClient(name, config)
+func init() {
+ log.Init(&log.LoggerConfig{})
}
diff --git a/internal/satellite/module/clientmanager.go b/internal/satellite/module/clientmanager.go
new file mode 100644
index 0000000..3d98cac
--- /dev/null
+++ b/internal/satellite/module/clientmanager.go
@@ -0,0 +1,182 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+ client "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+// The specific client statuses.
+const (
+ _ ClientStatus = iota
+ Connected
+ Disconnect
+)
+
+// ClientStatus represents the status of the client.
+type ClientStatus int8
+
+// ClientManager is a module plugin to control the connection with the outer service.
+type ClientManager struct {
+ sync.Mutex
+
+ // config
+ config *config.ClientManagerConfig
+
+ // dependency plugins
+ runningClient client.Client
+
+ // self components
+ listeners []chan ClientStatus // the sender client status listeners
+
+ // Metrics:
+ retryCount int64
+ status ClientStatus
+}
+
+func (c *ClientManager) Name() string {
+ return constant.ClientManagerModule
+}
+
+func (c *ClientManager) Description() string {
+ return "keep connection with external services, such as Kafka and OAP."
+}
+
+func (c *ClientManager) Config() config.ModuleConfig {
+ return c.config
+}
+
+// Init ClientManager, dependency plugins and self components.
+func (c *ClientManager) Init(cfg config.ModuleConfig) {
+ log.Logger.Infof("%s module of %s namespace is being initialized", c.Name(), c.config.NameSpace())
+ c.config = cfg.(*config.ClientManagerConfig)
+ c.runningClient = client.GetClient(c.config.ClientConfig)
+ c.listeners = []chan ClientStatus{}
+}
+
+// Prepare makes the connection with external services, such as Kafka and OAP.
+func (c *ClientManager) Prepare() error {
+ log.Logger.Infof("%s module of %s namespace is in preparing stage", c.Name(), c.config.NameSpace())
+ return c.initializeClient()
+}
+
+// Boot start ClientManager to maintain the connection with external services.
+func (c *ClientManager) Boot(ctx context.Context) {
+ log.Logger.Infof("%s module of %s namespace is running", c.Name(), c.config.NameSpace())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ timeTicker := time.NewTicker(time.Duration(c.config.RetryInterval) * time.Millisecond)
+ for {
+ select {
+ case <-timeTicker.C:
+ if err := c.connectClient(); err != nil {
+ log.Logger.Errorf("cannot make a connection with the %s client", c.runningClient.Name())
+ }
+ case <-ctx.Done():
+ c.Shutdown()
+ return
+ }
+ }
+ }()
+ wg.Wait()
+}
+
+// Shutdown close the connection and listeners.
+func (c *ClientManager) Shutdown() {
+ log.Logger.Infof("%s module of %s namespace is closing", c.Name(), c.config.NameSpace())
+ for _, listener := range c.listeners {
+ close(listener)
+ }
+ if err := c.runningClient.Close(); err != nil {
+ log.Logger.Errorf("an error occurring when closing %s client: %v", c.runningClient.Name(), err)
+ }
+}
+
+// RegisterListener adds the listener to listen to the status of the client.
+func (c *ClientManager) RegisterListener(listener chan ClientStatus) {
+ c.Lock()
+ defer c.Unlock()
+ c.listeners = append(c.listeners, listener)
+}
+
+// GetClient returns a connected client when . Otherwise, would return a nil client.
+func (c *ClientManager) GetConnectedClient() interface{} {
+ return c.runningClient.GetConnectedClient()
+}
+
+// ReportError reports the client is disconnect.
+func (c *ClientManager) ReportError() {
+ c.Lock()
+ defer c.Unlock()
+ if c.status == Connected {
+ c.status = Disconnect
+ c.notify()
+ }
+}
+
+// initializeClient initialize the connection with external services and retry one time when initialize failed.
+func (c *ClientManager) initializeClient() error {
+ c.Lock()
+ defer c.Unlock()
+ if err := c.connectClient(); err != nil {
+ log.Logger.Infof("preparing to reconnect with %s client,retrying in 10s", c.runningClient.Name())
+ time.Sleep(10 * time.Second)
+ return c.connectClient()
+ }
+ return nil
+}
+
+// connectClient would make a connection with external services, such as Kafka and OAP. When successfully connected,
+// ClientManager would notify the connected status to all senders.
+func (c *ClientManager) connectClient() error {
+ c.Lock()
+ defer c.Unlock()
+ if c.runningClient.IsConnected() {
+ return nil
+ }
+ log.Logger.Infof("preparing to connect with %s client", c.runningClient.Name())
+ if c.runningClient.IsConnected() {
+ return nil
+ }
+ c.retryCount++
+ err := c.runningClient.Connect()
+ if err == nil {
+ c.status = Connected
+ c.notify()
+ log.Logger.Infof("successfully connected to %s client", c.runningClient.Name())
+ }
+ return err
+}
+
+// notify the current status to all listeners.
+func (c *ClientManager) notify() {
+ c.Lock()
+ defer c.Unlock()
+ for _, listener := range c.listeners {
+ listener <- c.status
+ }
+}
diff --git a/internal/satellite/module/definition.go b/internal/satellite/module/definition.go
new file mode 100644
index 0000000..6348f03
--- /dev/null
+++ b/internal/satellite/module/definition.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+ "context"
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+)
+
+const (
+ // If a module is stored in the sharing namespace, the modules would be shared with all namespaces in moduleContainer.
+ SharingNamespaceName = "sharing"
+)
+
+// TODO add metrics func
+// Service id a custom plugin interface, which defines the processing.
+type Service interface {
+ plugin.Plugin
+
+ // Config returns the using ModuleConfig.
+ Config() config.ModuleConfig
+
+ // Init initialize the Module and register the instance to the registry.
+ // In this stage, no components is running.
+ Init(config config.ModuleConfig)
+
+ // Prepare would inject the dependency module and do dependency initialization.
+ Prepare() error
+
+ // Boot would start the module and return error when started failed. When a stop signal received
+ // or an exception occurs, the shutdown function would be called.
+ Boot(ctx context.Context)
+
+ // Shutdown could do some clean job to close Service.
+ Shutdown()
+}
+
+var moduleContainer map[string]Service
+
+// NewModuleService returns a new initialized Service and register it to the module container.
+func NewModuleService(cfg config.ModuleConfig) {
+ _ = plugin.Get(reflect.TypeOf((*Service)(nil)).Elem(), cfg).(Service)
+}
+
+// GetRunningModule returns a running Service.
+func GetRunningModule(namespace, moduleName string) Service {
+ if moduleService, ok := moduleContainer[namespace+moduleName]; ok {
+ return moduleService
+ }
+ if moduleService, ok := moduleContainer[SharingNamespaceName+moduleName]; ok {
+ return moduleService
+ }
+ return nil
+}
+
+func GetModuleContainer() map[string]Service {
+ return moduleContainer
+}
+
+// Register the Service category to the plugin registry.
+func init() {
+ moduleContainer = make(map[string]Service)
+ plugin.RegisterPluginCategory(reflect.TypeOf((*Service)(nil)).Elem(),
+ func(cfg interface{}) string {
+ // Get plugin name to find the specific Service plugin.
+ return cfg.(config.ModuleConfig).ModuleName()
+ },
+ func(plugin plugin.Plugin, cfg interface{}) {
+ // Initialize Service.
+ ms := plugin.(Service)
+ mc := cfg.(config.ModuleConfig)
+ ms.Init(mc)
+ },
+ func(plugin plugin.Plugin) {
+ // Register the initialized Service to the moduleContainer.
+ ms := plugin.(Service)
+ moduleContainer[ms.Config().NameSpace()+ms.Config().ModuleName()] = ms
+ },
+ )
+}
diff --git a/internal/satellite/module/gatherer.go b/internal/satellite/module/gatherer.go
new file mode 100644
index 0000000..765f6cc
--- /dev/null
+++ b/internal/satellite/module/gatherer.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+ "context"
+ "sync"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+ collector "github.com/apache/skywalking-satellite/plugins/collector/api"
+ queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+// Gatherer is the APM data collection module in Satellite.
+type Gatherer struct {
+ // config
+ config *config.GathererConfig
+
+ // dependency plugins
+ runningCollector collector.Collector
+ runningQueue queue.Queue
+}
+
+func (g *Gatherer) Name() string {
+ return constant.GathererModule
+}
+
+func (g *Gatherer) Description() string {
+ return "gatherer is the APM data collection module in Satellite, which supports Log, Trace, and Metrics scopes."
+}
+
+func (g *Gatherer) Config() config.ModuleConfig {
+ return g.config
+}
+
+// Init Gatherer, dependency plugins.
+func (g *Gatherer) Init(cfg config.ModuleConfig) {
+ log.Logger.Infof("%s module of %s namespace is being initialized", g.Name(), g.config.NameSpace())
+ g.config = cfg.(*config.GathererConfig)
+ g.runningCollector = collector.GetCollector(g.config.CollectorConfig)
+ g.runningQueue = queue.GetQueue(g.config.CollectorConfig)
+}
+
+// Prepare starts Collector and Queue.
+func (g *Gatherer) Prepare() error {
+ log.Logger.Infof("%s module of %s namespace is in preparing stage", g.Name(), g.config.NameSpace())
+ if err := g.runningCollector.Prepare(); err != nil {
+ log.Logger.Infof("%s collector of %s namespace was failed to initialize", g.runningCollector.Name(), g.config.NameSpace())
+ return err
+ }
+
+ if err := g.runningQueue.Prepare(); err != nil {
+ log.Logger.Infof("the %s queue of %s namespace was failed to initialize", g.runningQueue.Name(), g.config.NameSpace())
+ return err
+ }
+ return nil
+}
+
+// Boot fetches Collector input data and pushes it into Queue.
+func (g *Gatherer) Boot(ctx context.Context) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ for {
+ select {
+ case e := <-g.runningCollector.EventChannel():
+ err := g.runningQueue.Publisher().Enqueue(e)
+ if err != nil {
+ // todo add abandonedCount metrics
+ log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", g.config.NameSpace(), err)
+ }
+ case <-ctx.Done():
+ g.Shutdown()
+ return
+ }
+ }
+ }()
+ wg.Wait()
+}
+
+// Shutdown close Collector and Queue.
+func (g *Gatherer) Shutdown() {
+ log.Logger.Infof("%s module of %s namespace is closing", g.Name(), g.config.NameSpace())
+ if err := g.runningCollector.Close(); err != nil {
+ log.Logger.Errorf("failure occurs when closing %s collector in %s namespace, error is: %v", g.runningCollector.Name(), g.config.NameSpace(), err)
+ }
+ if err := g.runningQueue.Close(); err != nil {
+ log.Logger.Errorf("failure occurs when closing %s queue in %s namespace, error is: %v", g.runningQueue.Name(), g.config.NameSpace(), err)
+ }
+}
+
+// Ack some events according to the startOffset and the BatchSize
+func (g *Gatherer) Ack(startOffset int64, batchSize int) {
+ <-g.runningQueue.Ack(startOffset, batchSize)
+}
diff --git a/internal/satellite/module/processor.go b/internal/satellite/module/processor.go
new file mode 100644
index 0000000..e1cdf96
--- /dev/null
+++ b/internal/satellite/module/processor.go
@@ -0,0 +1,115 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+ "context"
+ "sync"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+ filter "github.com/apache/skywalking-satellite/plugins/filter/api"
+)
+
+// Processor is the processing module in Satellite.
+type Processor struct {
+ // config
+ config *config.ProcessorConfig
+
+ // dependency plugins
+ runningFilters []filter.Filter
+
+ // dependency modules
+ sender *Sender
+ gatherer *Gatherer
+}
+
+func (p *Processor) Name() string {
+ return constant.ProcessorModule
+}
+
+func (p *Processor) Description() string {
+ return "processor module is the core processing module in Satellite"
+}
+
+func (p *Processor) Config() config.ModuleConfig {
+ return p.config
+}
+
+// Init Processor and dependency plugins
+func (p *Processor) Init(cfg config.ModuleConfig) {
+ log.Logger.Infof("%s module of %s namespace is being initialized", p.Name(), p.config.NameSpace())
+ p.config = cfg.(*config.ProcessorConfig)
+ p.runningFilters = []filter.Filter{}
+ for _, c := range p.config.FilterConfig {
+ p.runningFilters = append(p.runningFilters, filter.GetFilter(c))
+ }
+}
+
+// Prepare inject the dependency modules.
+func (p *Processor) Prepare() error {
+ log.Logger.Infof("%s module of %s namespace is in preparing stage", p.Name(), p.config.NameSpace())
+ p.sender = GetRunningModule(p.config.NameSpace(), constant.SenderModule).(*Sender)
+ p.gatherer = GetRunningModule(p.config.NameSpace(), constant.GathererModule).(*Gatherer)
+ return nil
+}
+
+// BOOT fetches the data of Queue, does a series of processing, and then sends to Sender.
+func (p *Processor) Boot(ctx context.Context) {
+ log.Logger.Infof("%s module of %s namespace is running", p.Name(), p.config.NameSpace())
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ // fetch a new event from Queue of Gatherer
+ e, offset, err := p.gatherer.runningQueue.Consumer().Dequeue()
+ if err != nil {
+ // todo add metrics
+ log.Logger.Errorf("cannot get event from queue in %s namespace, error is: %v",
+ p.config.NameSpace(), err)
+ continue
+ }
+ c := &event.OutputEventContext{
+ Offset: offset,
+ Context: make(map[string]event.Event),
+ }
+ // processing the event with filters, that put the necessary events to OutputEventContext.
+ c.Put(e)
+ for _, f := range p.runningFilters {
+ f.Process(c)
+ }
+
+ select {
+ // put result input the Input channel of Sender
+ case p.sender.Input <- c:
+ case <-ctx.Done():
+ p.Shutdown()
+ return
+ }
+ }
+ }()
+ wg.Wait()
+}
+
+func (p *Processor) Shutdown() {
+ log.Logger.Infof("%s module of %s namespace is closing", p.Name(), p.config.NameSpace())
+}
diff --git a/internal/satellite/module/sender.go b/internal/satellite/module/sender.go
new file mode 100644
index 0000000..54ac3ac
--- /dev/null
+++ b/internal/satellite/module/sender.go
@@ -0,0 +1,181 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package module
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/constant"
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/config"
+ "github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+ fallbacker "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+ forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Sender is the forward module in Satellite.
+type Sender struct {
+ // config
+ config *config.SenderConfig
+
+ // dependency plugins
+ runningForwarders []forwarder.Forwarder
+ runningFallbacker fallbacker.Fallbacker
+
+ // dependency modules
+ gatherer *Gatherer
+ clientManager *ClientManager
+
+ // self components
+ Input chan *event.OutputEventContext // logic input channel
+ input chan *event.OutputEventContext // physical input channel
+ listener chan ClientStatus // client status listener
+ flushChannel chan *buffer.BatchBuffer // forwarder flush channel
+ buffer *buffer.BatchBuffer // cache the downstream input data
+}
+
+func (s *Sender) Name() string {
+ return constant.SenderModule
+}
+
+func (s *Sender) Description() string {
+ return "forward the input events to external services, such as Kafka and OAP"
+}
+
+func (s *Sender) Config() config.ModuleConfig {
+ return s.config
+}
+
+// Init Sender, dependency plugins and self components.
+func (s *Sender) Init(cfg config.ModuleConfig) {
+ log.Logger.Infof("%s module of %s namespace is being initialized", s.Name(), s.config.NameSpace())
+ s.config = cfg.(*config.SenderConfig)
+ s.runningFallbacker = fallbacker.GetFallbacker(s.config.FallbackerConfig)
+ s.runningForwarders = []forwarder.Forwarder{}
+ for _, c := range s.config.ForwardersConfig {
+ s.runningForwarders = append(s.runningForwarders, forwarder.GetForwarder(c))
+ }
+ s.input = make(chan *event.OutputEventContext)
+ s.Input = s.input
+ s.listener = make(chan ClientStatus)
+ s.flushChannel = make(chan *buffer.BatchBuffer, 1)
+ s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+}
+
+// Prepare inject the dependency modules and register the client status listener to the clientManager.
+func (s *Sender) Prepare() error {
+ log.Logger.Infof("%s module of %s namespace is in preparing stage", s.Name(), s.config.NameSpace())
+ s.clientManager = GetRunningModule(s.config.NameSpace(), constant.ClientManagerModule).(*ClientManager)
+ s.gatherer = GetRunningModule(s.config.NameSpace(), constant.GathererModule).(*Gatherer)
+ s.clientManager.RegisterListener(s.listener)
+ return nil
+}
+
+// Boot fetches the downstream input data and forward to external services, such as Kafka and OAP receiver.
+func (s *Sender) Boot(ctx context.Context) {
+ log.Logger.Infof("%s module of %s namespace is running", s.Name(), s.config.NameSpace())
+ var wg sync.WaitGroup
+ wg.Add(2)
+ // 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+ // 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+ go func() {
+ defer wg.Done()
+ timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+ for {
+ select {
+ case status := <-s.listener:
+ switch status {
+ case Connected:
+ log.Logger.Infof("%s module of %s namespace is notified the connection is connected", s.Name(), s.config.NameSpace())
+ s.Input = s.input
+ case Disconnect:
+ log.Logger.Infof("%s module of %s namespace is notified the connection is disconnected", s.Name(), s.config.NameSpace())
+ s.Input = nil
+ }
+ case <-timeTicker.C:
+ if s.buffer.Len() > s.config.MinFlushEvents {
+ s.flushChannel <- s.buffer
+ s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+ }
+ case e := <-s.Input:
+ s.buffer.Add(e)
+ if s.buffer.Len() == s.config.MaxBufferSize {
+ s.flushChannel <- s.buffer
+ s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+ }
+ case <-ctx.Done():
+ s.Input = nil
+ return
+ }
+ }
+ }()
+ // Keep fetching BatchBuffer to forward.
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case b := <-s.flushChannel:
+ s.consume(b)
+ case <-ctx.Done():
+ s.Shutdown()
+ return
+ }
+ }
+ }()
+ wg.Wait()
+}
+
+// Shutdown closes the channels and tries to force forward the events in the buffer.
+func (s *Sender) Shutdown() {
+ log.Logger.Infof("%s module of %s namespace is closing", s.Name(), s.config.NameSpace())
+ close(s.input)
+ for buf := range s.flushChannel {
+ s.consume(buf)
+ }
+ s.consume(s.buffer)
+ close(s.flushChannel)
+}
+
+// consume would forward the events by type and ack this batch.
+func (s *Sender) consume(batch *buffer.BatchBuffer) {
+ log.Logger.Infof("%s module of %s namespace is flushing a new batch buffer. the start offset is %d, and the batch size is %d",
+ s.Name(), s.config.NameSpace(), batch.First(), batch.BatchSize())
+ var events = make(map[event.Type]event.BatchEvents)
+ for i := 0; i < batch.Len(); i++ {
+ eventContext := batch.Buf()[i]
+ for _, e := range eventContext.Context {
+ if e.IsRemote() {
+ events[e.Type()] = append(events[e.Type()], e)
+ }
+ }
+ }
+
+ for _, f := range s.runningForwarders {
+ for t, batchEvents := range events {
+ if f.ForwardType() == t {
+ s.runningFallbacker.FallBack(batchEvents, s.clientManager.GetConnectedClient(), f.Forward, func() {
+ s.clientManager.ReportError()
+ })
+ }
+ }
+ }
+ s.gatherer.Ack(batch.First(), batch.BatchSize())
+}
diff --git a/internal/pkg/plugin/define.go b/plugins/README.md
similarity index 81%
rename from internal/pkg/plugin/define.go
rename to plugins/README.md
index ad95698..c67bca2 100644
--- a/internal/pkg/plugin/define.go
+++ b/plugins/README.md
@@ -1,22 +1,4 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package plugin
-
+```
// The following graph illustrates the relationship between different plugin interface in api package.
//
//
@@ -75,10 +57,4 @@ package plugin
// There are four stages in the lifecycle of Satellite plugins, which are the initial phase,
// preparing phase, running phase, and closing phase. In the running phase, each plugin has
// its own interface definition. However, the other three phases have to be defined uniformly.
-
-type Plugin interface {
- // Description returns the description of the specific plugin.
- Description() string
- // Init initialize the specific plugin.
- InitPlugin(config map[string]interface{})
-}
+```
\ No newline at end of file
diff --git a/plugins/client/api/client.go b/plugins/client/api/client.go
index 51d2b65..33f61a6 100644
--- a/plugins/client/api/client.go
+++ b/plugins/client/api/client.go
@@ -27,21 +27,24 @@ import (
type Client interface {
plugin.Plugin
- // Prepare would make connection with outer service.
- Prepare()
+ // Init would make connection with outer service.
+ Connect() error
+
+ // Return the status of the client.
+ IsConnected() bool
+
// GetConnection returns the connected client to publish events.
GetConnectedClient() interface{}
+
// Close the connection with outer service.
- Close()
+ Close() error
}
-var ClientCategory = reflect.TypeOf((*Client)(nil)).Elem()
-
// Get client plugin.
-func GetClient(pluginName string, config map[string]interface{}) Client {
- return plugin.Get(ClientCategory, pluginName, config).(Client)
+func GetClient(config plugin.DefaultConfig) Client {
+ return plugin.Get(reflect.TypeOf((*Client)(nil)).Elem(), config).(Client)
}
func init() {
- plugin.AddPluginCategory(ClientCategory)
+ plugin.RegisterPluginCategory(reflect.TypeOf((*Client)(nil)).Elem(), nil, nil, nil)
}
diff --git a/plugins/client/example/client.go b/plugins/client/example/client.go
deleted file mode 100644
index 77b29c7..0000000
--- a/plugins/client/example/client.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-type demoClient struct {
- a string
-}
-type demoClient2 struct {
- a string
-}
-type demoClient3 struct {
- a string
-}
-
-func (d demoClient) Description() string {
- panic("implement me")
-}
-
-func (d demoClient) InitPlugin(config map[string]interface{}) {
-
-}
-
-func (d demoClient) Prepare() {
- panic("implement me")
-}
-
-func (d demoClient) GetConnectedClient() interface{} {
- panic("implement me")
-}
-
-func (d demoClient) Close() {
- panic("implement me")
-}
-
-func (d *demoClient2) Description() string {
- panic("implement me")
-}
-
-func (d *demoClient2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoClient2) Prepare() {
- panic("implement me")
-}
-
-func (d *demoClient2) GetConnectedClient() interface{} {
- panic("implement me")
-}
-
-func (d *demoClient2) Close() {
- panic("implement me")
-}
diff --git a/plugins/collector/api/collector.go b/plugins/collector/api/collector.go
index 374d63a..a756974 100644
--- a/plugins/collector/api/collector.go
+++ b/plugins/collector/api/collector.go
@@ -27,7 +27,7 @@ import (
// Init() Initial stage: Init plugin by config
// ||
// \/
-// Prepare() Preparing stage: Prepare the collector, such as build connection with SkyWalking javaagent.
+// Init() Preparing stage: Init the collector, such as build connection with SkyWalking javaagent.
// ||
// \/
// Next() Running stage: When Collector collect a data, the data would be fetched by the upstream
@@ -38,22 +38,21 @@ import (
// Collector is a plugin interface, that defines new collectors.
type Collector interface {
plugin.Plugin
-
- // Prepare creates a listen or reader to gather data.
- Prepare()
+ // Prepare creates a listener or reader to gather APM data.
+ Prepare() error
// Next return the data from the input.
- Next() (event.SerializableEvent, error)
+ EventChannel() <-chan event.SerializableEvent
// Close would close collector.
- Close()
+ Close() error
}
var CollectorCategory = reflect.TypeOf((*Collector)(nil)).Elem()
// Get collector plugin.
-func GetCollector(pluginName string, config map[string]interface{}) Collector {
- return plugin.Get(CollectorCategory, pluginName, config).(Collector)
+func GetCollector(config plugin.DefaultConfig) Collector {
+ return plugin.Get(CollectorCategory, config).(Collector)
}
func init() {
- plugin.AddPluginCategory(CollectorCategory)
+ plugin.RegisterPluginCategory(CollectorCategory, nil, nil, nil)
}
diff --git a/plugins/collector/example/collector.go b/plugins/collector/example/collector.go
deleted file mode 100644
index 40b9b8b..0000000
--- a/plugins/collector/example/collector.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoCollector struct {
- a string
-}
-
-type demoCollector2 struct {
- a string
-}
-
-type demoCollector3 struct {
- a string
-}
-
-func (d *demoCollector) Description() string {
- panic("implement me")
-}
-
-func (d *demoCollector) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoCollector) Prepare() {
- panic("implement me")
-}
-
-func (d *demoCollector) Next() (event.SerializableEvent, error) {
- panic("implement me")
-}
-
-func (d *demoCollector) Close() {
- panic("implement me")
-}
-
-func (d demoCollector2) Description() string {
- panic("implement me")
-}
-
-func (d demoCollector2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoCollector2) Prepare() {
- panic("implement me")
-}
-
-func (d demoCollector2) Next() (event.SerializableEvent, error) {
- panic("implement me")
-}
-
-func (d demoCollector2) Close() {
- panic("implement me")
-}
diff --git a/plugins/collector/example/collector_test.go b/plugins/collector/example/collector_test.go
deleted file mode 100644
index e73f3bd..0000000
--- a/plugins/collector/example/collector_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/collector/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoCollector",
- args: &demoCollector{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoCollector2",
- args: demoCollector2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoCollector3",
- args: demoCollector3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetCollector(name, config)
-}
diff --git a/plugins/fallbacker/api/fallbacker.go b/plugins/fallbacker/api/fallbacker.go
index bde56ef..91dea45 100644
--- a/plugins/fallbacker/api/fallbacker.go
+++ b/plugins/fallbacker/api/fallbacker.go
@@ -22,23 +22,24 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/event"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/forwarder/api"
)
// Fallbacker is a plugin interface, that defines some fallback strategies.
type Fallbacker interface {
plugin.Plugin
-
// FallBack returns nil when finishing a successful process and returns a new Fallbacker when failure.
- FallBack(batch event.BatchEvents) Fallbacker
+ FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc, callback DisconnectionCallback) Fallbacker
}
-var FallbackerCategory = reflect.TypeOf((*Fallbacker)(nil)).Elem()
+type DisconnectionCallback func()
// Get Fallbacker plugin.
-func GetFallbacker(pluginName string, config map[string]interface{}) Fallbacker {
- return plugin.Get(FallbackerCategory, pluginName, config).(Fallbacker)
+func GetFallbacker(config plugin.DefaultConfig) Fallbacker {
+ return plugin.Get(reflect.TypeOf((*Fallbacker)(nil)).Elem(), config).(Fallbacker)
}
+// init register the Fallbacker interface
func init() {
- plugin.AddPluginCategory(FallbackerCategory)
+ plugin.RegisterPluginCategory(reflect.TypeOf((*Fallbacker)(nil)).Elem(), nil, nil, nil)
}
diff --git a/plugins/fallbacker/example/fallbacker.go b/plugins/fallbacker/example/fallbacker.go
deleted file mode 100644
index b933e96..0000000
--- a/plugins/fallbacker/example/fallbacker.go
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "github.com/apache/skywalking-satellite/internal/pkg/event"
- "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-type demoFallbacker struct {
- a string
-}
-
-type demoFallbacker2 struct {
- a string
-}
-
-type demoFallbacker3 struct {
- a string
-}
-
-func (d *demoFallbacker) Description() string {
- panic("implement me")
-}
-
-func (d *demoFallbacker) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoFallbacker) FallBack(batch event.BatchEvents) api.Fallbacker {
- panic("implement me")
-}
-
-func (d demoFallbacker2) Description() string {
- panic("implement me")
-}
-
-func (d demoFallbacker2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoFallbacker2) FallBack(batch event.BatchEvents) api.Fallbacker {
- panic("implement me")
-}
diff --git a/plugins/fallbacker/example/fallbacker_test.go b/plugins/fallbacker/example/fallbacker_test.go
deleted file mode 100644
index f5f445f..0000000
--- a/plugins/fallbacker/example/fallbacker_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/fallbacker/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoFallbacker",
- args: &demoFallbacker{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoFallbacker2",
- args: demoFallbacker2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoFallbacker3",
- args: demoFallbacker3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetFallbacker(name, config)
-}
diff --git a/plugins/fallbacker/timer/timer_fallbacker.go b/plugins/fallbacker/timer/timer_fallbacker.go
new file mode 100644
index 0000000..b86981e
--- /dev/null
+++ b/plugins/fallbacker/timer/timer_fallbacker.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package timer
+
+import (
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+)
+
+// Fallbacker is a timer fallbacker when forward fails. `latencyFactor` is the standard retry duration,
+// and the time for each retry is expanded by 2 times until the number of retries reaches the maximum.
+type Fallbacker struct {
+ maxTimes int `mapstructure:"max_times"`
+ latencyFactor int `mapstructure:"latency_factor"`
+}
+
+func (t *Fallbacker) Name() string {
+ return "timer-fallbacker"
+}
+
+func (t *Fallbacker) Description() string {
+ return "this is a timer trigger when forward fails."
+}
+
+func (t *Fallbacker) DefaultConfig() string {
+ return `
+max_times: 3
+latency_factor: 2000
+`
+}
+
+func (t *Fallbacker) FallBack(batch event.BatchEvents, connection interface{}, forward api.ForwardFunc) {
+ if err := forward(connection, batch); err != nil {
+ count := 1
+ currentLatency := count * t.latencyFactor
+ for count < t.maxTimes {
+ time.Sleep(time.Duration(currentLatency) * time.Millisecond)
+ if err := forward(connection, batch); err != nil {
+ currentLatency *= 2
+ } else {
+ break
+ }
+ }
+ }
+}
diff --git a/plugins/filter/api/filter.go b/plugins/filter/api/filter.go
index 0ed43db..bc8e9bb 100644
--- a/plugins/filter/api/filter.go
+++ b/plugins/filter/api/filter.go
@@ -34,17 +34,17 @@ import (
type Filter interface {
plugin.Plugin
- // Process produces a new event by processing incoming event.
- Process(in event.Event) event.Event
+ // Process would fetch the needed event
+ Process(context *event.OutputEventContext)
}
var FilterCategory = reflect.TypeOf((*Filter)(nil)).Elem()
// Get filter plugin.
-func GetFilter(pluginName string, config map[string]interface{}) Filter {
- return plugin.Get(FilterCategory, pluginName, config).(Filter)
+func GetFilter(config plugin.DefaultConfig) Filter {
+ return plugin.Get(FilterCategory, config).(Filter)
}
func init() {
- plugin.AddPluginCategory(FilterCategory)
+ plugin.RegisterPluginCategory(FilterCategory, nil, nil, nil)
}
diff --git a/plugins/filter/example/filter_test.go b/plugins/filter/example/filter_test.go
deleted file mode 100644
index aae4371..0000000
--- a/plugins/filter/example/filter_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/filter/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoFilter",
- args: &demoFilter{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoFilter2",
- args: demoFilter2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoFilter3",
- args: demoFilter3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetFilter(name, config)
-}
diff --git a/plugins/forwarder/api/forwarder.go b/plugins/forwarder/api/forwarder.go
index 9e99f14..b0509cd 100644
--- a/plugins/forwarder/api/forwarder.go
+++ b/plugins/forwarder/api/forwarder.go
@@ -24,34 +24,25 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
-// Init() Initiating stage: Init plugin by config
-// ||
-// \/
-// Prepare() Preparing stage: Prepare the Forwarder, such as get remote client.
-// ||
-// \/
-// Forward() Running stage: Forward the batch events
-// ||
-// \/
-// Close() Closing stage: Close the Collector, such as close connection with SkyWalking javaagent.
-
// Forwarder is a plugin interface, that defines new forwarders.
type Forwarder interface {
plugin.Plugin
// Forward the batch events to the external services, such as Kafka MQ and SkyWalking OAP cluster.
- Forward(batch event.BatchEvents)
-
- // ForwardType returns the supporting event type that could be forwarded.
+ Forward(connection interface{}, batch event.BatchEvents) error
+ // ForwardType returns the supported event type.
ForwardType() event.Type
}
-var ForwarderCategory = reflect.TypeOf((*Forwarder)(nil)).Elem()
+// ForwardFunc represent the Forward() in Forwarder
+type ForwardFunc func(connection interface{}, batch event.BatchEvents) error
-func GetForwarder(pluginName string, config map[string]interface{}) Forwarder {
- return plugin.Get(ForwarderCategory, pluginName, config).(Forwarder)
+// GetForwarder returns an initialized forwarder plugin.
+func GetForwarder(config map[string]interface{}) Forwarder {
+ return plugin.Get(reflect.TypeOf((*Forwarder)(nil)).Elem(), config).(Forwarder)
}
+// init register the Forwarder interface
func init() {
- plugin.AddPluginCategory(ForwarderCategory)
+ plugin.RegisterPluginCategory(reflect.TypeOf((*Forwarder)(nil)).Elem(), nil, nil, nil)
}
diff --git a/plugins/forwarder/example/forwarder.go b/plugins/forwarder/example/forwarder.go
deleted file mode 100644
index 637f9e1..0000000
--- a/plugins/forwarder/example/forwarder.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoForwarder struct {
- a string
-}
-
-type demoForwarder2 struct {
- a string
-}
-
-type demoForwarder3 struct {
- a string
-}
-
-func (d *demoForwarder) Description() string {
- panic("implement me")
-}
-
-func (d *demoForwarder) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoForwarder) Forward(batch event.BatchEvents) {
- panic("implement me")
-}
-
-func (d demoForwarder2) Description() string {
- panic("implement me")
-}
-
-func (d demoForwarder2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoForwarder2) Forward(batch event.BatchEvents) {
- panic("implement me")
-}
-
-func (d demoForwarder2) ForwardType() event.Type {
- panic("implement me")
-}
-
-func (d *demoForwarder) ForwardType() event.Type {
- panic("implement me")
-}
diff --git a/plugins/forwarder/example/forwarder_test.go b/plugins/forwarder/example/forwarder_test.go
deleted file mode 100644
index 28de35d..0000000
--- a/plugins/forwarder/example/forwarder_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/forwarder/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoForwarder",
- args: &demoForwarder{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoForwarder2",
- args: demoForwarder2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoForwarder3",
- args: demoForwarder3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetForwarder(name, config)
-}
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 3ca5ebe..cba3bca 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -38,12 +38,10 @@ type Parser interface {
ParseStr(str string) ([]event.SerializableEvent, error)
}
-var ParserCategory = reflect.TypeOf((*Parser)(nil)).Elem()
-
-func GetParser(pluginName string, config map[string]interface{}) Parser {
- return plugin.Get(ParserCategory, pluginName, config).(Parser)
+func GetParser(pluginName string, config plugin.DefaultConfig) Parser {
+ return plugin.Get(reflect.TypeOf((*Parser)(nil)).Elem(), config).(Parser)
}
func init() {
- plugin.AddPluginCategory(ParserCategory)
+ plugin.RegisterPluginCategory(reflect.TypeOf((*Parser)(nil)).Elem(), nil, nil, nil)
}
diff --git a/plugins/parser/example/parser.go b/plugins/parser/example/parser.go
deleted file mode 100644
index 839956f..0000000
--- a/plugins/parser/example/parser.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import "github.com/apache/skywalking-satellite/internal/pkg/event"
-
-type demoParser struct {
- a string
-}
-
-type demoParser2 struct {
- a string
-}
-
-type demoParser3 struct {
- a string
-}
-
-func (d *demoParser) Description() string {
- panic("implement me")
-}
-
-func (d *demoParser) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoParser) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
- panic("implement me")
-}
-
-func (d *demoParser) ParseStr(str string) ([]event.SerializableEvent, error) {
- panic("implement me")
-}
-
-func (d demoParser2) Description() string {
- panic("implement me")
-}
-
-func (d demoParser2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoParser2) ParseBytes(bytes []byte) ([]event.SerializableEvent, error) {
- panic("implement me")
-}
-
-func (d demoParser2) ParseStr(str string) ([]event.SerializableEvent, error) {
- panic("implement me")
-}
diff --git a/plugins/parser/example/parser_test.go b/plugins/parser/example/parser_test.go
deleted file mode 100644
index b577f7d..0000000
--- a/plugins/parser/example/parser_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/parser/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoParser",
- args: &demoParser{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoParser2",
- args: demoParser2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoParser3",
- args: demoParser3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetParser(name, config)
-}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index 9b95156..f6d4d69 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -28,6 +28,9 @@ import (
type Queue interface {
plugin.Plugin
+ // Prepare creates the queue.
+ Prepare() error
+
// Publisher get the only publisher for the current queue.
Publisher() QueuePublisher
@@ -35,27 +38,30 @@ type Queue interface {
Consumer() QueueConsumer
// Close would close the queue.
- Close()
+ Close() error
+
+ // Ack a batch
+ Ack(startOffset int64, batchSize int) chan struct{}
}
// QueuePublisher is a plugin interface, that defines new queue publishers.
type QueuePublisher interface {
// Enqueue push a inputEvent into the queue.
- Enqueue(event *event.SerializableEvent) error
+ Enqueue(event event.SerializableEvent) error
}
// QueueConsumer is a plugin interface, that defines new queue consumers.
type QueueConsumer interface {
// Dequeue pop an event form the Queue. When the queue is empty, the method would be blocked.
- Dequeue() (event *event.SerializableEvent, offset int64, err error)
+ Dequeue() (event event.SerializableEvent, offset int64, err error)
}
var QueueCategory = reflect.TypeOf((*Queue)(nil)).Elem()
-func GetQueue(pluginName string, config map[string]interface{}) Queue {
- return plugin.Get(QueueCategory, pluginName, config).(Queue)
+func GetQueue(config plugin.DefaultConfig) Queue {
+ return plugin.Get(QueueCategory, config).(Queue)
}
func init() {
- plugin.AddPluginCategory(QueueCategory)
+ plugin.RegisterPluginCategory(QueueCategory, nil, nil, nil)
}
diff --git a/plugins/queue/example/queue.go b/plugins/queue/example/queue.go
deleted file mode 100644
index 5a3055a..0000000
--- a/plugins/queue/example/queue.go
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-type demoQueue struct {
- a string
-}
-
-type demoQueue2 struct {
- a string
-}
-
-type demoQueue3 struct {
- a string
-}
-
-func (d *demoQueue) Description() string {
- panic("implement me")
-}
-
-func (d *demoQueue) InitPlugin(config map[string]interface{}) {
-}
-
-func (d *demoQueue) Publisher() api.QueuePublisher {
- panic("implement me")
-}
-
-func (d *demoQueue) Consumer() api.QueueConsumer {
- panic("implement me")
-}
-
-func (d *demoQueue) Close() {
- panic("implement me")
-}
-
-func (d demoQueue2) Description() string {
- panic("implement me")
-}
-
-func (d demoQueue2) InitPlugin(config map[string]interface{}) {
-}
-
-func (d demoQueue2) Publisher() api.QueuePublisher {
- panic("implement me")
-}
-
-func (d demoQueue2) Consumer() api.QueueConsumer {
- panic("implement me")
-}
-
-func (d demoQueue2) Close() {
- panic("implement me")
-}
diff --git a/plugins/queue/example/queue_test.go b/plugins/queue/example/queue_test.go
deleted file mode 100644
index 0afd20d..0000000
--- a/plugins/queue/example/queue_test.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package example
-
-import (
- "testing"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
- "github.com/apache/skywalking-satellite/plugins/queue/api"
-)
-
-func Test_Register(t *testing.T) {
- tests := []struct {
- name string
- args interface{}
- panic bool
- }{
- {
- name: "demoQueue",
- args: &demoQueue{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoQueue2",
- args: demoQueue2{
- a: "s",
- },
- panic: false,
- },
- {
- name: "demoQueue3",
- args: demoQueue3{
- a: "s",
- },
- panic: true,
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- plugin.RegisterPlugin(tt.name, tt.args)
- assertPanic(t, tt.name, nil, tt.panic)
- })
- }
-}
-
-func assertPanic(t *testing.T, name string, config map[string]interface{}, existPanic bool) {
- defer func() {
- if r := recover(); r != nil && !existPanic {
- t.Errorf("the plugin %s is not pass", name)
- }
- }()
- api.GetQueue(name, config)
-}