You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/04/07 07:31:56 UTC
[skywalking-satellite] 01/01: fix some bugs in Satellite
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch fix-race-bug
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit f353fac01c867f9198f289c56132fb4d8ae8129a
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Apr 7 15:30:48 2021 +0800
fix some bugs in Satellite
---
CHANGES.md | 22 +---
README.md | 4 +-
docs/en/concepts-and-designs/mmap-queue.md | 24 ++--
docs/en/guides/contribution/How-to-release.md | 4 +-
docs/en/guides/contribution/How-to-write-plugin.md | 2 +-
go.mod | 8 +-
go.sum | 14 ++-
internal/satellite/module/sender/create.go | 3 +-
internal/satellite/module/sender/sender.go | 138 ++++++++++++---------
plugins/queue/mmap/meta/meta.go | 32 ++++-
plugins/queue/mmap/queue.go | 19 +--
plugins/queue/mmap/queue_lock.go | 54 ++++++++
plugins/queue/mmap/queue_operation.go | 13 +-
plugins/queue/mmap/segment_operation.go | 7 +-
protocol/gen-codes/skywalking/network/go.mod | 6 +-
protocol/gen-codes/skywalking/network/go.sum | 16 ++-
16 files changed, 240 insertions(+), 126 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1b4032e..be5f0f5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,25 +2,15 @@ Changes by Version
==================
Release Notes.
-0.1.0
+0.2.0
------------------
#### Features
-* Build the Satellite core structure.
-* Add prometheus self telemetry.
-* Add kafka client plugin.
-* Add none-fallbacker plugin.
-* Add timer-fallbacker plugin.
-* Add nativelog-kafka-forwarder plugin.
-* Add memory-queue plugin.
-* Add mmap-queue plugin.
-* Add grpc-nativelog-receiver plugin.
-* Add http-nativelog-receiver plugin.
-* Add grpc-server plugin.
-* Add http-server plugin.
-* Add prometheus-server plugin.
+
#### Bug Fixes
+Fix the data race in mmap queue.
+Fix channel blocking in sender module.
#### Issues and PR
-- All issues are [here](https://github.com/apache/skywalking/milestone/64?closed=1)
-- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0)
+- All issues are [here](https://github.com/apache/skywalking/milestone/80?closed=1)
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.2.0)
diff --git a/README.md b/README.md
index b1d97b7..6915593 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ NOTICE, SkyWalking Satellite uses [v3 protocols](https://github.com/apache/skywa
Please read [the doc](./docs/en/FAQ/performance.md) to get more details.
# Download
-Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/guides/compile/How-to-compile.md) to see whether the plugin is available on Windows.
+Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/setup/plugins) to see whether the plugin is available on Windows.
# Compile
As SkyWalking Satellite is using `Makefile`, compiling the project is as easy as executing a command in the root directory of the project.
@@ -32,7 +32,7 @@ git submodule init
git submodule update
make build
```
-If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/How-to-compile.md).
+If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/compile.md).
# Commands
diff --git a/docs/en/concepts-and-designs/mmap-queue.md b/docs/en/concepts-and-designs/mmap-queue.md
index df0eaf0..d3b3582 100644
--- a/docs/en/concepts-and-designs/mmap-queue.md
+++ b/docs/en/concepts-and-designs/mmap-queue.md
@@ -41,12 +41,12 @@ goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
BenchmarkEnqueue
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 10000 106520 ns/op 9888 B/op 9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 18536 54331 ns/op 9839 B/op 9 allocs/op
-BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 27859 43251 ns/op 9815 B/op 9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 23673 45910 ns/op 9839 B/op 9 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 10000 131686 ns/op 18941 B/op 10 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 23011 47101 ns/op 9887 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 27585 43559 ns/op 9889 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 39326 31773 ns/op 9840 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 56770 22990 ns/op 9816 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 43803 29778 ns/op 9840 B/op 9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 16870 80576 ns/op 18944 B/op 10 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 36922 39085 ns/op 9889 B/op 9 allocs/op
PASS
```
### push and pop operation
@@ -55,11 +55,11 @@ goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
BenchmarkEnqueueAndDequeue
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 18895 53056 ns/op 28773 B/op 42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 24104 117128 ns/op 28725 B/op 42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 23733 71632 ns/op 28699 B/op 41 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 26286 64377 ns/op 28725 B/op 42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 10000 118004 ns/op 54978 B/op 43 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 16489 64400 ns/op 28772 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 21030 60728 ns/op 28774 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 30327 41274 ns/op 28726 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 32738 37923 ns/op 28700 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 28209 41169 ns/op 28726 B/op 42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 14677 89637 ns/op 54981 B/op 43 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 22228 54963 ns/op 28774 B/op 42 allocs/op
PASS
```
diff --git a/docs/en/guides/contribution/How-to-release.md b/docs/en/guides/contribution/How-to-release.md
index af07674..ad5dbd8 100644
--- a/docs/en/guides/contribution/How-to-release.md
+++ b/docs/en/guides/contribution/How-to-release.md
@@ -5,7 +5,7 @@ This documentation guides the release manager to release the SkyWalking Satellit
## Prerequisites
1. Close(if finished, or move to next milestone otherwise) all issues in the current milestone from [skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones) and [skywalking](https://github.com/apache/skywalking/milestones), create a new milestone if needed.
-2. Update [CHANGES.md](../../../../CHANGES.md).
+2. Update [CHANGES.md](../CHANGES.md).
## Add your GPG public key to Apache svn
@@ -152,7 +152,7 @@ are in `https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` wi
1. `LICENSE` and `NOTICE` are in source codes and distribution package.
1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`.
1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc skywalking-satellite-$VERSION-{src,bin}.tgz`.
-1. Build distribution from source code package by following this command, `make build`.
+1. Build distribution from source code package by following this [the build guide](#build-and-sign-the-source-code-package).
1. Licenses check, `make license`.
Vote result should follow these:
diff --git a/docs/en/guides/contribution/How-to-write-plugin.md b/docs/en/guides/contribution/How-to-write-plugin.md
index 555f046..b2f4a61 100644
--- a/docs/en/guides/contribution/How-to-write-plugin.md
+++ b/docs/en/guides/contribution/How-to-write-plugin.md
@@ -32,7 +32,7 @@ Let's use memory-queue as an example of how to write a plugin.
event_buffer_size: 5000
```
-3. Add [unit test](../test/How-to-unit-test.md).
+3. Add [unit test](../test/test.md).
4. Generate the plugin docs.
```shell script
make check
diff --git a/go.mod b/go.mod
index 0c7e00c..d1d86c4 100644
--- a/go.mod
+++ b/go.mod
@@ -7,14 +7,14 @@ replace skywalking/network v1.0.0 => ./protocol/gen-codes/skywalking/network
require (
github.com/Shopify/sarama v1.27.2
github.com/enriquebris/goconcurrentqueue v0.6.0
- github.com/golang/protobuf v1.4.3
- github.com/google/go-cmp v0.5.4
+ github.com/golang/protobuf v1.5.2
+ github.com/google/go-cmp v0.5.5
github.com/grandecola/mmap v0.6.0
github.com/prometheus/client_golang v1.9.0
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/urfave/cli/v2 v2.3.0
- google.golang.org/grpc v1.35.0
- google.golang.org/protobuf v1.25.0
+ google.golang.org/grpc v1.36.1
+ google.golang.org/protobuf v1.26.0
skywalking/network v1.0.0
)
diff --git a/go.sum b/go.sum
index 13be86f..5ec45ee 100644
--- a/go.sum
+++ b/go.sum
@@ -134,6 +134,9 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
@@ -147,8 +150,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
-github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -589,8 +592,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -601,6 +604,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
diff --git a/internal/satellite/module/sender/create.go b/internal/satellite/module/sender/create.go
index 5da3cca..12906e2 100644
--- a/internal/satellite/module/sender/create.go
+++ b/internal/satellite/module/sender/create.go
@@ -36,8 +36,7 @@ func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) api.Sender {
runningFallbacker: fallbacker.GetFallbacker(cfg.FallbackerConfig),
runningClient: sharing.Manager[cfg.ClientName].(client.Client),
gatherer: g,
- logicInput: nil,
- physicalInput: make(chan *event.OutputEventContext),
+ input: make(chan *event.OutputEventContext),
listener: make(chan client.ClientStatus),
flushChannel: make(chan *buffer.BatchBuffer, 1),
buffer: buffer.NewBatchBuffer(cfg.MaxBufferSize),
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 86eec6d..e75e24c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -20,6 +20,7 @@ package sender
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/sirupsen/logrus"
@@ -51,11 +52,11 @@ type Sender struct {
gatherer gatherer.Gatherer
// self components
- logicInput chan *event.OutputEventContext // logic input channel
- physicalInput chan *event.OutputEventContext // physical input channel
- listener chan client.ClientStatus // client status listener
- flushChannel chan *buffer.BatchBuffer // forwarder flush channel
- buffer *buffer.BatchBuffer // cache the downstream input data
+ input chan *event.OutputEventContext // physical input channel
+ listener chan client.ClientStatus // client status listener
+ flushChannel chan *buffer.BatchBuffer // forwarder flush channel
+ buffer *buffer.BatchBuffer // cache the downstream input data
+ blocking int32 // the status of input channel
// metrics
sendCounter *telemetry.Counter
@@ -65,7 +66,6 @@ type Sender struct {
func (s *Sender) Prepare() error {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is preparing...")
s.runningClient.RegisterListener(s.listener)
- s.logicInput = s.physicalInput
for _, runningForwarder := range s.runningForwarders {
err := runningForwarder.Prepare(s.runningClient.GetConnectedClient())
if err != nil {
@@ -80,64 +80,90 @@ func (s *Sender) Prepare() error {
func (s *Sender) Boot(ctx context.Context) {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is starting...")
var wg sync.WaitGroup
- wg.Add(2)
- // 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
- // 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
- go func() {
- defer wg.Done()
- childCtx, cancel := context.WithCancel(ctx)
- timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
- for {
- select {
- case status := <-s.listener:
- switch status {
- case client.Connected:
- log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is connected")
- s.logicInput = s.physicalInput
- case client.Disconnect:
- log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is disconnected")
- s.logicInput = nil
- }
- case <-timeTicker.C:
- if s.buffer.Len() > s.config.MinFlushEvents {
- s.flushChannel <- s.buffer
- s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
- }
- case e := <-s.logicInput:
- s.buffer.Add(e)
- if s.buffer.Len() == s.config.MaxBufferSize {
- s.flushChannel <- s.buffer
- s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
- }
- case <-childCtx.Done():
- cancel()
- s.logicInput = nil
- return
+ wg.Add(3)
+ go s.store(ctx, &wg)
+ go s.listen(ctx, &wg)
+ go s.flush(ctx, &wg)
+ wg.Wait()
+}
+
+// store data.
+// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+ for {
+ // blocking output when disconnecting.
+ if atomic.LoadInt32(&s.blocking) == 1 {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ select {
+ case <-childCtx.Done():
+ return
+ case <-timeTicker.C:
+ if s.buffer.Len() > s.config.MinFlushEvents {
+ s.flushChannel <- s.buffer
+ s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+ }
+ case e := <-s.input:
+ if e == nil {
+ continue
+ }
+ s.buffer.Add(e)
+ if s.buffer.Len() == s.config.MaxBufferSize {
+ s.flushChannel <- s.buffer
+ s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
}
}
- }()
- // Keep fetching BatchBuffer to forward.
- go func() {
- defer wg.Done()
- childCtx, cancel := context.WithCancel(ctx)
- for {
- select {
- case b := <-s.flushChannel:
- s.consume(b)
- case <-childCtx.Done():
- cancel()
- s.Shutdown()
- return
+ }
+}
+
+// Listen the client status.
+func (s *Sender) listen(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("listen routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ for {
+ select {
+ case <-childCtx.Done():
+ return
+ case status := <-s.listener:
+ switch status {
+ case client.Connected:
+ log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module connected")
+ atomic.StoreInt32(&s.blocking, 0)
+ case client.Disconnect:
+ log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module disconnected")
+ atomic.StoreInt32(&s.blocking, 1)
}
}
- }()
- wg.Wait()
+ }
+}
+
+// Keep fetching BatchBuffer to forward.
+func (s *Sender) flush(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("flush routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ for {
+ select {
+ case <-childCtx.Done():
+ s.Shutdown()
+ return
+ case b := <-s.flushChannel:
+ s.consume(b)
+ }
+ }
}
// Shutdown closes the channels and tries to force forward the events in the buffer.
func (s *Sender) Shutdown() {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
- close(s.physicalInput)
+ close(s.input)
ticker := time.NewTicker(module.ShutdownHookTime)
for {
select {
@@ -187,5 +213,5 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
}
func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
- return s.logicInput
+ return s.input
}
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
index 11f75fa..36a5faa 100644
--- a/plugins/queue/mmap/meta/meta.go
+++ b/plugins/queue/mmap/meta/meta.go
@@ -22,6 +22,7 @@ package meta
import (
"fmt"
"path/filepath"
+ "sync"
"syscall"
"github.com/grandecola/mmap"
@@ -60,6 +61,7 @@ type Metadata struct {
name string
size int
capacity int
+ lock sync.RWMutex
}
// NewMetaData read or create a Metadata with supported metaVersion
@@ -92,76 +94,104 @@ func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
// GetVersion returns the meta version.
func (m *Metadata) GetVersion() int {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int(m.metaFile.ReadUint64At(versionPos))
}
// PutVersion put the version into the memory mapped file.
func (m *Metadata) PutVersion(version int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(version), versionPos)
}
// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(widPos)), int64(m.metaFile.ReadUint64At(woffsetPos))
}
// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), widPos)
m.metaFile.WriteUint64At(uint64(offset), woffsetPos)
}
// GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment.
func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(wmidPos)), int64(m.metaFile.ReadUint64At(wmoffsetPos))
}
// PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset.
func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), wmidPos)
m.metaFile.WriteUint64At(uint64(offset), wmoffsetPos)
}
// GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment.
func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(cidPos)), int64(m.metaFile.ReadUint64At(coffsetPos))
}
// PutCommittedOffset put the segment ID and the offset of the segment into the committed offset.
func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), cidPos)
m.metaFile.WriteUint64At(uint64(offset), coffsetPos)
}
// GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment.
func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(ridPos)), int64(m.metaFile.ReadUint64At(roffsetPos))
}
// PutReadingOffset put the segment ID and the offset of the segment into the reading offset.
func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), ridPos)
m.metaFile.WriteUint64At(uint64(offset), roffsetPos)
}
// GetCapacity returns the capacity of the queue.
func (m *Metadata) GetCapacity() int {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int(m.metaFile.ReadUint64At(capacityPos))
}
// PutCapacity put the capacity into the memory mapped file.
func (m *Metadata) PutCapacity(version int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(version), capacityPos)
}
// Flush the memory mapped file to the disk.
func (m *Metadata) Flush() error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
return m.metaFile.Flush(syscall.MS_SYNC)
}
// Close do Flush operation and unmap the memory mapped file.
func (m *Metadata) Close() error {
- if err := m.Flush(); err != nil {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil {
return err
}
return m.metaFile.Unmap()
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 44d20ee..539c59f 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -61,7 +61,6 @@ type Queue struct {
MaxEventSize int `mapstructure:"max_event_size"` // The max size of the input event.
// running components
- lock sync.Mutex
queueName string // The queue name.
meta *meta.Metadata // The metadata file.
segments []*mmap.File // The data files.
@@ -72,6 +71,7 @@ type Queue struct {
sufficientMemChannel chan struct{} // Notify when memory is sufficient
markReadChannel chan int64 // Transfer the read segmentID to do ummap operation.
ready bool // The status of the queue.
+ locker []int32 // locker
// control components
ctx context.Context // Parent ctx
@@ -145,6 +145,7 @@ func (q *Queue) Initialize() error {
q.ctx, q.cancel = context.WithCancel(context.Background())
// async supported processes.
q.showDownWg.Add(2)
+ q.locker = make([]int32, q.QueueCapacitySegments)
go q.segmentSwapper()
go q.flush()
q.ready = true
@@ -187,8 +188,7 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
}
func (q *Queue) Close() error {
- q.lock.Lock()
- defer q.lock.Unlock()
+ q.ready = false
q.cancel()
q.showDownWg.Wait()
for i, segment := range q.segments {
@@ -204,7 +204,6 @@ func (q *Queue) Close() error {
if err := q.meta.Close(); err != nil {
log.Logger.Errorf("cannot unmap the metadata: %v", err)
}
- q.ready = false
return nil
}
@@ -226,8 +225,7 @@ func (q *Queue) Ack(lastOffset event.Offset) {
// flush control the flush operation by timer or counter.
func (q *Queue) flush() {
defer q.showDownWg.Done()
- ctx, cancel := context.WithCancel(q.ctx)
- defer cancel()
+ ctx, _ := context.WithCancel(q.ctx) // nolint
for {
timer := time.NewTimer(time.Duration(q.FlushPeriod) * time.Millisecond)
select {
@@ -245,13 +243,16 @@ func (q *Queue) flush() {
// doFlush flush the segment and meta files to the disk.
func (q *Queue) doFlush() {
- for _, segment := range q.segments {
- if segment == nil {
+ for i := range q.segments {
+ q.lockByIndex(i)
+ if q.segments[i] == nil {
+ q.unlockByIndex(i)
continue
}
- if err := segment.Flush(syscall.MS_SYNC); err != nil {
+ if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
log.Logger.Errorf("cannot flush segment file: %v", err)
}
+ q.unlockByIndex(i)
}
wid, woffset := q.meta.GetWritingOffset()
q.meta.PutWatermarkOffset(wid, woffset)
diff --git a/plugins/queue/mmap/queue_lock.go b/plugins/queue/mmap/queue_lock.go
new file mode 100644
index 0000000..7e7c06b
--- /dev/null
+++ b/plugins/queue/mmap/queue_lock.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import "sync/atomic"
+
+func (q *Queue) lock(segmentID int64) {
+ index := q.GetIndex(segmentID)
+ for {
+ if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+ return
+ }
+ }
+}
+
+func (q *Queue) unlock(segmentID int64) {
+ index := q.GetIndex(segmentID)
+ for {
+ if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+ return
+ }
+ }
+}
+
+func (q *Queue) lockByIndex(index int) {
+ for {
+ if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+ return
+ }
+ }
+}
+
+func (q *Queue) unlockByIndex(index int) {
+ for {
+ if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+ return
+ }
+ }
+}
diff --git a/plugins/queue/mmap/queue_operation.go b/plugins/queue/mmap/queue_operation.go
index 9ee6e74..d00df14 100644
--- a/plugins/queue/mmap/queue_operation.go
+++ b/plugins/queue/mmap/queue_operation.go
@@ -43,8 +43,6 @@ const uInt64Size = 8
// enqueue writes the data into the file system. It first writes the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
func (q *Queue) enqueue(bytes []byte) error {
- q.lock.Lock()
- defer q.lock.Unlock()
if q.isFull() {
return api.ErrFull
}
@@ -69,8 +67,6 @@ func (q *Queue) enqueue(bytes []byte) error {
// dequeue reads the data from the file system. It first reads the length of the data,
// then the data itself. It means the whole data may not exist in the one segments.
func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
- q.lock.Lock()
- defer q.lock.Unlock()
if q.isEmpty() {
return nil, 0, 0, api.ErrEmpty
}
@@ -95,11 +91,13 @@ func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, new
counter := 0
res := make([]byte, length)
for {
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return nil, 0, 0, err
}
readBytes, err := segment.ReadAt(res[counter:], offset)
+ q.unlock(id)
if err != nil {
return nil, 0, 0, err
}
@@ -120,11 +118,13 @@ func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int
if offset+uInt64Size > int64(q.SegmentSize) {
id, offset = id+1, 0
}
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, 0, err
}
num := segment.ReadUint64At(offset)
+ q.unlock(id)
offset += uInt64Size
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
@@ -137,11 +137,13 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6
if offset+uInt64Size > int64(q.SegmentSize) {
id, offset = id+1, 0
}
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, err
}
segment.WriteUint64At(uint64(length), offset)
+ q.unlock(id)
offset += uInt64Size
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
@@ -153,13 +155,14 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6
func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
counter := 0
length := len(bytes)
-
for {
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, err
}
writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+ q.unlock(id)
if err != nil {
return 0, 0, err
}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
index 42d54b2..aa2f421 100644
--- a/plugins/queue/mmap/segment_operation.go
+++ b/plugins/queue/mmap/segment_operation.go
@@ -35,7 +35,7 @@ import (
// GetSegment returns a memory mapped file at the segmentID position.
func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
- if q.mmapCount >= q.MaxInMemSegments {
+ if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments {
q.insufficientMemChannel <- struct{}{}
<-q.sufficientMemChannel
}
@@ -85,14 +85,15 @@ func (q *Queue) unmapSegment(segmentID int64) error {
// segmentSwapper run with a go routine to ensure the memory cost.
func (q *Queue) segmentSwapper() {
defer q.showDownWg.Done()
- ctx, cancel := context.WithCancel(q.ctx)
- defer cancel()
+ ctx, _ := context.WithCancel(q.ctx) // nolint
for {
select {
case id := <-q.markReadChannel:
+ q.lock(id)
if q.unmapSegment(id) != nil {
log.Logger.Errorf("cannot unmap the markread segment: %d", id)
}
+ q.unlock(id)
case <-q.insufficientMemChannel:
if q.mmapCount >= q.MaxInMemSegments {
if q.doSwap() != nil {
diff --git a/protocol/gen-codes/skywalking/network/go.mod b/protocol/gen-codes/skywalking/network/go.mod
index 0797ac0..6f56c75 100644
--- a/protocol/gen-codes/skywalking/network/go.mod
+++ b/protocol/gen-codes/skywalking/network/go.mod
@@ -3,7 +3,7 @@ module skywalking/network
go 1.15
require (
- github.com/golang/protobuf v1.4.3
- google.golang.org/grpc v1.35.0
- google.golang.org/protobuf v1.25.0
+ github.com/golang/protobuf v1.5.2
+ google.golang.org/grpc v1.36.1
+ google.golang.org/protobuf v1.26.0
)
diff --git a/protocol/gen-codes/skywalking/network/go.sum b/protocol/gen-codes/skywalking/network/go.sum
index 8f0da6e..76f5239 100644
--- a/protocol/gen-codes/skywalking/network/go.sum
+++ b/protocol/gen-codes/skywalking/network/go.sum
@@ -19,14 +19,16 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
-github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -67,8 +69,8 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -77,8 +79,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=