You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/04/07 07:31:56 UTC

[skywalking-satellite] 01/01: fix some bugs in Satellite

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch fix-race-bug
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit f353fac01c867f9198f289c56132fb4d8ae8129a
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Apr 7 15:30:48 2021 +0800

    fix some bugs in Satellite
---
 CHANGES.md                                         |  22 +---
 README.md                                          |   4 +-
 docs/en/concepts-and-designs/mmap-queue.md         |  24 ++--
 docs/en/guides/contribution/How-to-release.md      |   4 +-
 docs/en/guides/contribution/How-to-write-plugin.md |   2 +-
 go.mod                                             |   8 +-
 go.sum                                             |  14 ++-
 internal/satellite/module/sender/create.go         |   3 +-
 internal/satellite/module/sender/sender.go         | 138 ++++++++++++---------
 plugins/queue/mmap/meta/meta.go                    |  32 ++++-
 plugins/queue/mmap/queue.go                        |  19 +--
 plugins/queue/mmap/queue_lock.go                   |  54 ++++++++
 plugins/queue/mmap/queue_operation.go              |  13 +-
 plugins/queue/mmap/segment_operation.go            |   7 +-
 protocol/gen-codes/skywalking/network/go.mod       |   6 +-
 protocol/gen-codes/skywalking/network/go.sum       |  16 ++-
 16 files changed, 240 insertions(+), 126 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1b4032e..be5f0f5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,25 +2,15 @@ Changes by Version
 ==================
 Release Notes.
 
-0.1.0
+0.2.0
 ------------------
 #### Features
-* Build the Satellite core structure.
-* Add prometheus self telemetry.
-* Add kafka client plugin.
-* Add none-fallbacker plugin.
-* Add timer-fallbacker plugin.
-* Add nativelog-kafka-forwarder plugin.
-* Add memory-queue plugin.
-* Add mmap-queue plugin.
-* Add grpc-nativelog-receiver plugin.
-* Add http-nativelog-receiver plugin.
-* Add grpc-server plugin.
-* Add http-server plugin.
-* Add prometheus-server plugin.
+
 
 #### Bug Fixes
+Fix the data race in mmap queue.
+Fix channel blocking in sender module. 
 
 #### Issues and PR
-- All issues  are [here](https://github.com/apache/skywalking/milestone/64?closed=1)  
-- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0)
+- All issues  are [here](https://github.com/apache/skywalking/milestone/80?closed=1)  
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.2.0)
diff --git a/README.md b/README.md
index b1d97b7..6915593 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ NOTICE, SkyWalking Satellite uses [v3 protocols](https://github.com/apache/skywa
 Please read [the doc](./docs/en/FAQ/performance.md) to get more details.
 
 # Download
-Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/guides/compile/How-to-compile.md) to see whether the plugin is available on Windows.
+Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/setup/plugins) to see whether the plugin is available on Windows.
 
 # Compile
 As SkyWalking Satellite is using `Makefile`, compiling the project is as easy as executing a command in the root directory of the project.
@@ -32,7 +32,7 @@ git submodule init
 git submodule update
 make build
 ```
-If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/How-to-compile.md).
+If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/compile.md).
 
 
 # Commands
diff --git a/docs/en/concepts-and-designs/mmap-queue.md b/docs/en/concepts-and-designs/mmap-queue.md
index df0eaf0..d3b3582 100644
--- a/docs/en/concepts-and-designs/mmap-queue.md
+++ b/docs/en/concepts-and-designs/mmap-queue.md
@@ -41,12 +41,12 @@ goos: darwin
 goarch: amd64
 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
 BenchmarkEnqueue
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   10000	    106520 ns/op	    9888 B/op	       9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   18536	     54331 ns/op	    9839 B/op	       9 allocs/op
-BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   27859	     43251 ns/op	    9815 B/op	       9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   23673	     45910 ns/op	    9839 B/op	       9 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   10000	    131686 ns/op	   18941 B/op	      10 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   23011	     47101 ns/op	    9887 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   27585	     43559 ns/op	    9889 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   39326	     31773 ns/op	    9840 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   56770	     22990 ns/op	    9816 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   43803	     29778 ns/op	    9840 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   16870	     80576 ns/op	   18944 B/op	      10 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   36922	     39085 ns/op	    9889 B/op	       9 allocs/op
 PASS
 ```
 ### push and pop operation
@@ -55,11 +55,11 @@ goos: darwin
 goarch: amd64
 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
 BenchmarkEnqueueAndDequeue
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   18895	     53056 ns/op	   28773 B/op	      42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   24104	    117128 ns/op	   28725 B/op	      42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   23733	     71632 ns/op	   28699 B/op	      41 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   26286	     64377 ns/op	   28725 B/op	      42 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   10000	    118004 ns/op	   54978 B/op	      43 allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   16489	     64400 ns/op	   28772 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   21030	     60728 ns/op	   28774 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   30327	     41274 ns/op	   28726 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   32738	     37923 ns/op	   28700 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   28209	     41169 ns/op	   28726 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   14677	     89637 ns/op	   54981 B/op	      43 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   22228	     54963 ns/op	   28774 B/op	      42 allocs/op
 PASS
 ```
diff --git a/docs/en/guides/contribution/How-to-release.md b/docs/en/guides/contribution/How-to-release.md
index af07674..ad5dbd8 100644
--- a/docs/en/guides/contribution/How-to-release.md
+++ b/docs/en/guides/contribution/How-to-release.md
@@ -5,7 +5,7 @@ This documentation guides the release manager to release the SkyWalking Satellit
 ## Prerequisites
 
 1. Close(if finished, or move to next milestone otherwise) all issues in the current milestone from [skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones) and [skywalking](https://github.com/apache/skywalking/milestones), create a new milestone if needed.
-2. Update [CHANGES.md](../../../../CHANGES.md).
+2. Update [CHANGES.md](../CHANGES.md).
 
 
 ## Add your GPG public key to Apache svn
@@ -152,7 +152,7 @@ are in `https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` wi
 1. `LICENSE` and `NOTICE` are in source codes and distribution package.
 1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`.
 1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc skywalking-satellite-$VERSION-{src,bin}.tgz`.
-1. Build distribution from source code package by following this command, `make build`.
+1. Build distribution from source code package by following this [the build guide](#build-and-sign-the-source-code-package).
 1. Licenses check, `make license`.
 
 Vote result should follow these:
diff --git a/docs/en/guides/contribution/How-to-write-plugin.md b/docs/en/guides/contribution/How-to-write-plugin.md
index 555f046..b2f4a61 100644
--- a/docs/en/guides/contribution/How-to-write-plugin.md
+++ b/docs/en/guides/contribution/How-to-write-plugin.md
@@ -32,7 +32,7 @@ Let's use memory-queue as an example of how to write a plugin.
     event_buffer_size: 5000
     ```
    
-3. Add [unit test](../test/How-to-unit-test.md).
+3. Add [unit test](../test/test.md).
 4. Generate the plugin docs.
 ```shell script
 make check
diff --git a/go.mod b/go.mod
index 0c7e00c..d1d86c4 100644
--- a/go.mod
+++ b/go.mod
@@ -7,14 +7,14 @@ replace skywalking/network v1.0.0 => ./protocol/gen-codes/skywalking/network
 require (
 	github.com/Shopify/sarama v1.27.2
 	github.com/enriquebris/goconcurrentqueue v0.6.0
-	github.com/golang/protobuf v1.4.3
-	github.com/google/go-cmp v0.5.4
+	github.com/golang/protobuf v1.5.2
+	github.com/google/go-cmp v0.5.5
 	github.com/grandecola/mmap v0.6.0
 	github.com/prometheus/client_golang v1.9.0
 	github.com/sirupsen/logrus v1.7.0
 	github.com/spf13/viper v1.7.1
 	github.com/urfave/cli/v2 v2.3.0
-	google.golang.org/grpc v1.35.0
-	google.golang.org/protobuf v1.25.0
+	google.golang.org/grpc v1.36.1
+	google.golang.org/protobuf v1.26.0
 	skywalking/network v1.0.0
 )
diff --git a/go.sum b/go.sum
index 13be86f..5ec45ee 100644
--- a/go.sum
+++ b/go.sum
@@ -134,6 +134,9 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
 github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
@@ -147,8 +150,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
-github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -589,8 +592,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
 google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
 google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -601,6 +604,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
diff --git a/internal/satellite/module/sender/create.go b/internal/satellite/module/sender/create.go
index 5da3cca..12906e2 100644
--- a/internal/satellite/module/sender/create.go
+++ b/internal/satellite/module/sender/create.go
@@ -36,8 +36,7 @@ func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) api.Sender {
 		runningFallbacker: fallbacker.GetFallbacker(cfg.FallbackerConfig),
 		runningClient:     sharing.Manager[cfg.ClientName].(client.Client),
 		gatherer:          g,
-		logicInput:        nil,
-		physicalInput:     make(chan *event.OutputEventContext),
+		input:             make(chan *event.OutputEventContext),
 		listener:          make(chan client.ClientStatus),
 		flushChannel:      make(chan *buffer.BatchBuffer, 1),
 		buffer:            buffer.NewBatchBuffer(cfg.MaxBufferSize),
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 86eec6d..e75e24c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -20,6 +20,7 @@ package sender
 import (
 	"context"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/sirupsen/logrus"
@@ -51,11 +52,11 @@ type Sender struct {
 	gatherer gatherer.Gatherer
 
 	// self components
-	logicInput    chan *event.OutputEventContext // logic input channel
-	physicalInput chan *event.OutputEventContext // physical input channel
-	listener      chan client.ClientStatus       // client status listener
-	flushChannel  chan *buffer.BatchBuffer       // forwarder flush channel
-	buffer        *buffer.BatchBuffer            // cache the downstream input data
+	input        chan *event.OutputEventContext // physical input channel
+	listener     chan client.ClientStatus       // client status listener
+	flushChannel chan *buffer.BatchBuffer       // forwarder flush channel
+	buffer       *buffer.BatchBuffer            // cache the downstream input data
+	blocking     int32                          // the status of input channel
 
 	// metrics
 	sendCounter *telemetry.Counter
@@ -65,7 +66,6 @@ type Sender struct {
 func (s *Sender) Prepare() error {
 	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is preparing...")
 	s.runningClient.RegisterListener(s.listener)
-	s.logicInput = s.physicalInput
 	for _, runningForwarder := range s.runningForwarders {
 		err := runningForwarder.Prepare(s.runningClient.GetConnectedClient())
 		if err != nil {
@@ -80,64 +80,90 @@ func (s *Sender) Prepare() error {
 func (s *Sender) Boot(ctx context.Context) {
 	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is starting...")
 	var wg sync.WaitGroup
-	wg.Add(2)
-	// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
-	// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
-	go func() {
-		defer wg.Done()
-		childCtx, cancel := context.WithCancel(ctx)
-		timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
-		for {
-			select {
-			case status := <-s.listener:
-				switch status {
-				case client.Connected:
-					log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is connected")
-					s.logicInput = s.physicalInput
-				case client.Disconnect:
-					log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is disconnected")
-					s.logicInput = nil
-				}
-			case <-timeTicker.C:
-				if s.buffer.Len() > s.config.MinFlushEvents {
-					s.flushChannel <- s.buffer
-					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
-				}
-			case e := <-s.logicInput:
-				s.buffer.Add(e)
-				if s.buffer.Len() == s.config.MaxBufferSize {
-					s.flushChannel <- s.buffer
-					s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
-				}
-			case <-childCtx.Done():
-				cancel()
-				s.logicInput = nil
-				return
+	wg.Add(3)
+	go s.store(ctx, &wg)
+	go s.listen(ctx, &wg)
+	go s.flush(ctx, &wg)
+	wg.Wait()
+}
+
+// store data.
+// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
+// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel.
+func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) {
+	defer wg.Done()
+	defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store routine closed")
+	childCtx, _ := context.WithCancel(ctx) // nolint
+	timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond)
+	for {
+		// blocking output when disconnecting.
+		if atomic.LoadInt32(&s.blocking) == 1 {
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		select {
+		case <-childCtx.Done():
+			return
+		case <-timeTicker.C:
+			if s.buffer.Len() > s.config.MinFlushEvents {
+				s.flushChannel <- s.buffer
+				s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
+			}
+		case e := <-s.input:
+			if e == nil {
+				continue
+			}
+			s.buffer.Add(e)
+			if s.buffer.Len() == s.config.MaxBufferSize {
+				s.flushChannel <- s.buffer
+				s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
 			}
 		}
-	}()
-	// Keep fetching BatchBuffer to forward.
-	go func() {
-		defer wg.Done()
-		childCtx, cancel := context.WithCancel(ctx)
-		for {
-			select {
-			case b := <-s.flushChannel:
-				s.consume(b)
-			case <-childCtx.Done():
-				cancel()
-				s.Shutdown()
-				return
+	}
+}
+
+// Listen the client status.
+func (s *Sender) listen(ctx context.Context, wg *sync.WaitGroup) {
+	defer wg.Done()
+	defer log.Logger.WithField("pipe", s.config.PipeName).Infof("listen routine closed")
+	childCtx, _ := context.WithCancel(ctx) // nolint
+	for {
+		select {
+		case <-childCtx.Done():
+			return
+		case status := <-s.listener:
+			switch status {
+			case client.Connected:
+				log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module connected")
+				atomic.StoreInt32(&s.blocking, 0)
+			case client.Disconnect:
+				log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module disconnected")
+				atomic.StoreInt32(&s.blocking, 1)
 			}
 		}
-	}()
-	wg.Wait()
+	}
+}
+
+// Keep fetching BatchBuffer to forward.
+func (s *Sender) flush(ctx context.Context, wg *sync.WaitGroup) {
+	defer wg.Done()
+	defer log.Logger.WithField("pipe", s.config.PipeName).Infof("flush routine closed")
+	childCtx, _ := context.WithCancel(ctx) // nolint
+	for {
+		select {
+		case <-childCtx.Done():
+			s.Shutdown()
+			return
+		case b := <-s.flushChannel:
+			s.consume(b)
+		}
+	}
 }
 
 // Shutdown closes the channels and tries to force forward the events in the buffer.
 func (s *Sender) Shutdown() {
 	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
-	close(s.physicalInput)
+	close(s.input)
 	ticker := time.NewTicker(module.ShutdownHookTime)
 	for {
 		select {
@@ -187,5 +213,5 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 }
 
 func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
-	return s.logicInput
+	return s.input
 }
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
index 11f75fa..36a5faa 100644
--- a/plugins/queue/mmap/meta/meta.go
+++ b/plugins/queue/mmap/meta/meta.go
@@ -22,6 +22,7 @@ package meta
 import (
 	"fmt"
 	"path/filepath"
+	"sync"
 	"syscall"
 
 	"github.com/grandecola/mmap"
@@ -60,6 +61,7 @@ type Metadata struct {
 	name     string
 	size     int
 	capacity int
+	lock     sync.RWMutex
 }
 
 // NewMetaData read or create a Metadata with supported metaVersion
@@ -92,76 +94,104 @@ func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
 
 // GetVersion returns the meta version.
 func (m *Metadata) GetVersion() int {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int(m.metaFile.ReadUint64At(versionPos))
 }
 
 // PutVersion put the version into the memory mapped file.
 func (m *Metadata) PutVersion(version int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(version), versionPos)
 }
 
 // GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
 func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int64(m.metaFile.ReadUint64At(widPos)), int64(m.metaFile.ReadUint64At(woffsetPos))
 }
 
 // PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
 func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(segmentID), widPos)
 	m.metaFile.WriteUint64At(uint64(offset), woffsetPos)
 }
 
 // GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment.
 func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int64(m.metaFile.ReadUint64At(wmidPos)), int64(m.metaFile.ReadUint64At(wmoffsetPos))
 }
 
 // PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset.
 func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(segmentID), wmidPos)
 	m.metaFile.WriteUint64At(uint64(offset), wmoffsetPos)
 }
 
 // GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment.
 func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int64(m.metaFile.ReadUint64At(cidPos)), int64(m.metaFile.ReadUint64At(coffsetPos))
 }
 
 // PutCommittedOffset put the segment ID and the offset of the segment into the committed offset.
 func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(segmentID), cidPos)
 	m.metaFile.WriteUint64At(uint64(offset), coffsetPos)
 }
 
 // GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment.
 func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int64(m.metaFile.ReadUint64At(ridPos)), int64(m.metaFile.ReadUint64At(roffsetPos))
 }
 
 // PutReadingOffset put the segment ID and the offset of the segment into the reading offset.
 func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(segmentID), ridPos)
 	m.metaFile.WriteUint64At(uint64(offset), roffsetPos)
 }
 
 // GetCapacity returns the capacity of the queue.
 func (m *Metadata) GetCapacity() int {
+	m.lock.RLock()
+	defer m.lock.RUnlock()
 	return int(m.metaFile.ReadUint64At(capacityPos))
 }
 
 // PutCapacity put the capacity into the memory mapped file.
 func (m *Metadata) PutCapacity(version int64) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	m.metaFile.WriteUint64At(uint64(version), capacityPos)
 }
 
 // Flush the memory mapped file to the disk.
 func (m *Metadata) Flush() error {
+	m.lock.Lock()
+	defer m.lock.Unlock()
 	return m.metaFile.Flush(syscall.MS_SYNC)
 }
 
 // Close do Flush operation and unmap the memory mapped file.
 func (m *Metadata) Close() error {
-	if err := m.Flush(); err != nil {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil {
 		return err
 	}
 	return m.metaFile.Unmap()
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 44d20ee..539c59f 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -61,7 +61,6 @@ type Queue struct {
 	MaxEventSize          int   `mapstructure:"max_event_size"`          // The max size of the input event.
 
 	// running components
-	lock                   sync.Mutex
 	queueName              string         // The queue name.
 	meta                   *meta.Metadata // The metadata file.
 	segments               []*mmap.File   // The data files.
@@ -72,6 +71,7 @@ type Queue struct {
 	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
 	markReadChannel        chan int64     // Transfer the read segmentID to do ummap operation.
 	ready                  bool           // The status of the queue.
+	locker                 []int32        // locker
 
 	// control components
 	ctx        context.Context    // Parent ctx
@@ -145,6 +145,7 @@ func (q *Queue) Initialize() error {
 	q.ctx, q.cancel = context.WithCancel(context.Background())
 	// async supported processes.
 	q.showDownWg.Add(2)
+	q.locker = make([]int32, q.QueueCapacitySegments)
 	go q.segmentSwapper()
 	go q.flush()
 	q.ready = true
@@ -187,8 +188,7 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
 }
 
 func (q *Queue) Close() error {
-	q.lock.Lock()
-	defer q.lock.Unlock()
+	q.ready = false
 	q.cancel()
 	q.showDownWg.Wait()
 	for i, segment := range q.segments {
@@ -204,7 +204,6 @@ func (q *Queue) Close() error {
 	if err := q.meta.Close(); err != nil {
 		log.Logger.Errorf("cannot unmap the metadata: %v", err)
 	}
-	q.ready = false
 	return nil
 }
 
@@ -226,8 +225,7 @@ func (q *Queue) Ack(lastOffset event.Offset) {
 // flush control the flush operation by timer or counter.
 func (q *Queue) flush() {
 	defer q.showDownWg.Done()
-	ctx, cancel := context.WithCancel(q.ctx)
-	defer cancel()
+	ctx, _ := context.WithCancel(q.ctx) // nolint
 	for {
 		timer := time.NewTimer(time.Duration(q.FlushPeriod) * time.Millisecond)
 		select {
@@ -245,13 +243,16 @@ func (q *Queue) flush() {
 
 // doFlush flush the segment and meta files to the disk.
 func (q *Queue) doFlush() {
-	for _, segment := range q.segments {
-		if segment == nil {
+	for i := range q.segments {
+		q.lockByIndex(i)
+		if q.segments[i] == nil {
+			q.unlockByIndex(i)
 			continue
 		}
-		if err := segment.Flush(syscall.MS_SYNC); err != nil {
+		if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
 			log.Logger.Errorf("cannot flush segment file: %v", err)
 		}
+		q.unlockByIndex(i)
 	}
 	wid, woffset := q.meta.GetWritingOffset()
 	q.meta.PutWatermarkOffset(wid, woffset)
diff --git a/plugins/queue/mmap/queue_lock.go b/plugins/queue/mmap/queue_lock.go
new file mode 100644
index 0000000..7e7c06b
--- /dev/null
+++ b/plugins/queue/mmap/queue_lock.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import "sync/atomic"
+
+func (q *Queue) lock(segmentID int64) {
+	index := q.GetIndex(segmentID)
+	for {
+		if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+			return
+		}
+	}
+}
+
+func (q *Queue) unlock(segmentID int64) {
+	index := q.GetIndex(segmentID)
+	for {
+		if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+			return
+		}
+	}
+}
+
+func (q *Queue) lockByIndex(index int) {
+	for {
+		if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+			return
+		}
+	}
+}
+
+func (q *Queue) unlockByIndex(index int) {
+	for {
+		if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+			return
+		}
+	}
+}
diff --git a/plugins/queue/mmap/queue_operation.go b/plugins/queue/mmap/queue_operation.go
index 9ee6e74..d00df14 100644
--- a/plugins/queue/mmap/queue_operation.go
+++ b/plugins/queue/mmap/queue_operation.go
@@ -43,8 +43,6 @@ const uInt64Size = 8
 // enqueue writes the data into the file system. It first writes the length of the data,
 // then the data itself. It means the whole data may not exist in the one segments.
 func (q *Queue) enqueue(bytes []byte) error {
-	q.lock.Lock()
-	defer q.lock.Unlock()
 	if q.isFull() {
 		return api.ErrFull
 	}
@@ -69,8 +67,6 @@ func (q *Queue) enqueue(bytes []byte) error {
 // dequeue reads the data from the file system. It first reads the length of the data,
 // then the data itself. It means the whole data may not exist in the one segments.
 func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
-	q.lock.Lock()
-	defer q.lock.Unlock()
 	if q.isEmpty() {
 		return nil, 0, 0, api.ErrEmpty
 	}
@@ -95,11 +91,13 @@ func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, new
 	counter := 0
 	res := make([]byte, length)
 	for {
+		q.lock(id)
 		segment, err := q.GetSegment(id)
 		if err != nil {
 			return nil, 0, 0, err
 		}
 		readBytes, err := segment.ReadAt(res[counter:], offset)
+		q.unlock(id)
 		if err != nil {
 			return nil, 0, 0, err
 		}
@@ -120,11 +118,13 @@ func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int
 	if offset+uInt64Size > int64(q.SegmentSize) {
 		id, offset = id+1, 0
 	}
+	q.lock(id)
 	segment, err := q.GetSegment(id)
 	if err != nil {
 		return 0, 0, 0, err
 	}
 	num := segment.ReadUint64At(offset)
+	q.unlock(id)
 	offset += uInt64Size
 	if offset == int64(q.SegmentSize) {
 		id, offset = id+1, 0
@@ -137,11 +137,13 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6
 	if offset+uInt64Size > int64(q.SegmentSize) {
 		id, offset = id+1, 0
 	}
+	q.lock(id)
 	segment, err := q.GetSegment(id)
 	if err != nil {
 		return 0, 0, err
 	}
 	segment.WriteUint64At(uint64(length), offset)
+	q.unlock(id)
 	offset += uInt64Size
 	if offset == int64(q.SegmentSize) {
 		id, offset = id+1, 0
@@ -153,13 +155,14 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6
 func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
 	counter := 0
 	length := len(bytes)
-
 	for {
+		q.lock(id)
 		segment, err := q.GetSegment(id)
 		if err != nil {
 			return 0, 0, err
 		}
 		writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+		q.unlock(id)
 		if err != nil {
 			return 0, 0, err
 		}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
index 42d54b2..aa2f421 100644
--- a/plugins/queue/mmap/segment_operation.go
+++ b/plugins/queue/mmap/segment_operation.go
@@ -35,7 +35,7 @@ import (
 
 // GetSegment returns a memory mapped file at the segmentID position.
 func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
-	if q.mmapCount >= q.MaxInMemSegments {
+	if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments {
 		q.insufficientMemChannel <- struct{}{}
 		<-q.sufficientMemChannel
 	}
@@ -85,14 +85,15 @@ func (q *Queue) unmapSegment(segmentID int64) error {
 // segmentSwapper run with a go routine to ensure the memory cost.
 func (q *Queue) segmentSwapper() {
 	defer q.showDownWg.Done()
-	ctx, cancel := context.WithCancel(q.ctx)
-	defer cancel()
+	ctx, _ := context.WithCancel(q.ctx) // nolint
 	for {
 		select {
 		case id := <-q.markReadChannel:
+			q.lock(id)
 			if q.unmapSegment(id) != nil {
 				log.Logger.Errorf("cannot unmap the markread segment: %d", id)
 			}
+			q.unlock(id)
 		case <-q.insufficientMemChannel:
 			if q.mmapCount >= q.MaxInMemSegments {
 				if q.doSwap() != nil {
diff --git a/protocol/gen-codes/skywalking/network/go.mod b/protocol/gen-codes/skywalking/network/go.mod
index 0797ac0..6f56c75 100644
--- a/protocol/gen-codes/skywalking/network/go.mod
+++ b/protocol/gen-codes/skywalking/network/go.mod
@@ -3,7 +3,7 @@ module skywalking/network
 go 1.15
 
 require (
-	github.com/golang/protobuf v1.4.3
-	google.golang.org/grpc v1.35.0
-	google.golang.org/protobuf v1.25.0
+	github.com/golang/protobuf v1.5.2
+	google.golang.org/grpc v1.36.1
+	google.golang.org/protobuf v1.26.0
 )
diff --git a/protocol/gen-codes/skywalking/network/go.sum b/protocol/gen-codes/skywalking/network/go.sum
index 8f0da6e..76f5239 100644
--- a/protocol/gen-codes/skywalking/network/go.sum
+++ b/protocol/gen-codes/skywalking/network/go.sum
@@ -19,14 +19,16 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
-github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -67,8 +69,8 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -77,8 +79,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
 google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=